Limit futures dependency to Stream via feature flag (#1774)

In an effort to reach API stability, the `tokio` crate is shedding its
_public_ dependencies on crates that are either a) do not provide a
stable (1.0+) release with longevity guarantees or b) match the `tokio`
release cadence. Of course, implementing `std` traits fits the
requirements.

The on exception, for now, is the `Stream` trait found in `futures_core`.
It is expected that this trait will not change much and be moved into `std.
Since Tokio is not yet going reaching 1.0, I feel that it is acceptable to maintain
a dependency on this trait given how foundational it is.

Since the `Stream` implementation is optional, types that are logically
streams provide `async fn next_*` functions to obtain the next value.
Avoiding the `next()` name prevents fn conflicts with `StreamExt::next()`.

Additionally, some misc cleanup is also done:

- `tokio::io::io` -> `tokio::io::util`.
- `delay` -> `delay_until`.
- `Timeout::new` -> `timeout(...)`.
- `signal::ctrl_c()` returns a future instead of a stream.
- `{tcp,unix}::Incoming` is removed (due to lack of `Stream` trait).
- `time::Throttle` is removed (due to lack of `Stream` trait).
-  Fix: `mpsc::UnboundedSender::send(&self)` (no more conflict with `Sink` fns).
This commit is contained in:
Carl Lerche 2019-11-15 22:11:13 -08:00 committed by GitHub
parent 930679587a
commit 8a7e57786a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
130 changed files with 1763 additions and 1794 deletions

View File

@ -116,18 +116,12 @@ impl Shared {
/// Send a `LineCodec` encoded message to every peer, except
/// for the sender.
async fn broadcast(
&mut self,
sender: SocketAddr,
message: &str,
) -> Result<(), mpsc::error::UnboundedSendError> {
async fn broadcast(&mut self, sender: SocketAddr, message: &str) {
for peer in self.peers.iter_mut() {
if *peer.0 != sender {
peer.1.send(message.into()).await?;
let _ = peer.1.send(message.into());
}
}
Ok(())
}
}
@ -218,7 +212,7 @@ async fn process(
let mut state = state.lock().await;
let msg = format!("{} has joined the chat", username);
println!("{}", msg);
state.broadcast(addr, &msg).await?;
state.broadcast(addr, &msg).await;
}
// Process incoming messages until our stream is exhausted by a disconnect.
@ -230,7 +224,7 @@ async fn process(
let mut state = state.lock().await;
let msg = format!("{}: {}", username, msg);
state.broadcast(addr, &msg).await?;
state.broadcast(addr, &msg).await;
}
// A message was received from a peer. Send it to the
// current user.
@ -254,7 +248,7 @@ async fn process(
let msg = format!("{} has left the chat", username);
println!("{}", msg);
state.broadcast(addr, &msg).await?;
state.broadcast(addr, &msg).await;
}
Ok(())

View File

@ -20,7 +20,7 @@ use tokio::io;
use tokio::sync::{mpsc, oneshot};
use tokio_util::codec::{FramedRead, FramedWrite};
use futures::{SinkExt, Stream, StreamExt};
use futures::{Stream, StreamExt};
use std::env;
use std::error::Error;
use std::net::SocketAddr;
@ -69,12 +69,14 @@ async fn run() -> Result<(), Box<dyn Error>> {
// Temporary work around for stdin blocking the stream
fn stdin() -> impl Stream<Item = Result<Vec<u8>, io::Error>> + Unpin {
let mut stdin = FramedRead::new(io::stdin(), codec::Bytes).map(Ok);
let mut stdin = FramedRead::new(io::stdin(), codec::Bytes);
let (mut tx, rx) = mpsc::unbounded_channel();
let (tx, rx) = mpsc::unbounded_channel();
tokio::spawn(async move {
tx.send_all(&mut stdin).await.unwrap();
while let Some(res) = stdin.next().await {
let _ = tx.send(res);
}
});
rx

View File

@ -55,9 +55,9 @@
#![warn(rust_2018_idioms)]
use tokio::net::TcpListener;
use tokio::prelude::*;
use tokio_util::codec::{BytesCodec, Decoder};
use futures::StreamExt;
use std::env;
#[tokio::main]

View File

@ -22,7 +22,7 @@
#![warn(rust_2018_idioms)]
use futures::{future::try_join, FutureExt, StreamExt};
use futures::{future::try_join, FutureExt};
use std::{env, error::Error};
use tokio::{
io::AsyncReadExt,
@ -37,9 +37,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
println!("Listening on: {}", listen_addr);
println!("Proxying to: {}", server_addr);
let mut incoming = TcpListener::bind(listen_addr).await?.incoming();
let mut listener = TcpListener::bind(listen_addr).await?;
while let Some(Ok(inbound)) = incoming.next().await {
while let Ok((inbound, _)) = listener.accept().await {
let transfer = transfer(inbound, server_addr.clone()).map(|r| {
if let Err(e) = r {
println!("Failed to transfer; error={}", e);

View File

@ -8,9 +8,8 @@
#![warn(rust_2018_idioms)]
use tokio::future::FutureExt as TokioFutureExt;
use tokio::io;
use tokio::net::UdpSocket;
use tokio::{io, time};
use tokio_util::codec::BytesCodec;
use tokio_util::udp::UdpFramed;
@ -68,7 +67,7 @@ async fn ping(socket: &mut UdpFramed<BytesCodec>, b_addr: SocketAddr) -> Result<
async fn pong(socket: &mut UdpFramed<BytesCodec>) -> Result<(), io::Error> {
let timeout = Duration::from_millis(200);
while let Ok(Some(Ok((bytes, addr)))) = socket.next().timeout(timeout).await {
while let Ok(Some(Ok((bytes, addr)))) = time::timeout(timeout, socket.next()).await {
println!("[b] recv: {}", String::from_utf8_lossy(&bytes));
socket.send((Bytes::from(&b"PONG"[..]), addr)).await?;

View File

@ -5,7 +5,6 @@ use tokio::process::{Child, Command};
use tokio_test::assert_ok;
use futures::future::{self, FutureExt};
use futures::stream::StreamExt;
use std::env;
use std::io;
use std::process::{ExitStatus, Stdio};
@ -47,9 +46,9 @@ async fn feed_cat(mut cat: Child, n: usize) -> io::Result<ExitStatus> {
// (i.e. EOF is reached after `n` lines.
loop {
let data = reader
.next()
.next_line()
.await
.unwrap_or_else(|| Ok(String::new()))
.unwrap_or_else(|_| Some(String::new()))
.expect("failed to read line");
let num_read = data.len();

View File

@ -122,7 +122,7 @@ impl Handle {
/// The next operation in the mock's script will be to expect a `read` call
/// and return `buf`.
pub fn read(&mut self, buf: &[u8]) -> &mut Self {
self.tx.try_send(Action::Read(buf.into())).unwrap();
self.tx.send(Action::Read(buf.into())).unwrap();
self
}
@ -131,7 +131,7 @@ impl Handle {
/// The next operation in the mock's script will be to expect a `write`
/// call.
pub fn write(&mut self, buf: &[u8]) -> &mut Self {
self.tx.try_send(Action::Write(buf.into())).unwrap();
self.tx.send(Action::Write(buf.into())).unwrap();
self
}
}
@ -298,7 +298,7 @@ impl AsyncRead for Mock {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
if let Some(rem) = self.inner.remaining_wait() {
let until = Instant::now() + rem;
self.inner.sleep = Some(time::delay(until));
self.inner.sleep = Some(time::delay_until(until));
} else {
self.inner.read_wait = Some(cx.waker().clone());
return Poll::Pending;
@ -340,7 +340,7 @@ impl AsyncWrite for Mock {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
if let Some(rem) = self.inner.remaining_wait() {
let until = Instant::now() + rem;
self.inner.sleep = Some(time::delay(until));
self.inner.sleep = Some(time::delay_until(until));
} else {
panic!("unexpected WouldBlock");
}

View File

@ -1,6 +1,6 @@
#![warn(rust_2018_idioms)]
use tokio::time::{delay, Duration, Instant};
use tokio::time::{delay_until, Duration, Instant};
use tokio_test::block_on;
#[test]
@ -20,5 +20,5 @@ fn async_fn() {
#[test]
fn test_delay() {
let deadline = Instant::now() + Duration::from_millis(100);
assert_eq!((), block_on(delay(deadline)));
assert_eq!((), block_on(delay_until(deadline)));
}

View File

@ -524,7 +524,7 @@ async fn client_to_server() {
drop(env_logger::try_init());
// Create a server listening on a port, then figure out what that port is
let srv = t!(TcpListener::bind("127.0.0.1:0").await);
let mut srv = t!(TcpListener::bind("127.0.0.1:0").await);
let addr = t!(srv.local_addr());
let (server_cx, client_cx) = contexts();
@ -559,7 +559,7 @@ async fn server_to_client() {
drop(env_logger::try_init());
// Create a server listening on a port, then figure out what that port is
let srv = t!(TcpListener::bind("127.0.0.1:0").await);
let mut srv = t!(TcpListener::bind("127.0.0.1:0").await);
let addr = t!(srv.local_addr());
let (server_cx, client_cx) = contexts();
@ -590,7 +590,7 @@ async fn one_byte_at_a_time() {
const AMT: usize = 1024;
drop(env_logger::try_init());
let srv = t!(TcpListener::bind("127.0.0.1:0").await);
let mut srv = t!(TcpListener::bind("127.0.0.1:0").await);
let addr = t!(srv.local_addr());
let (server_cx, client_cx) = contexts();

View File

@ -32,7 +32,7 @@ pin-project = "0.4"
tokio = { version = "=0.2.0-alpha.6", path = "../tokio" }
tokio-test = { version = "=0.2.0-alpha.6", path = "../tokio-test" }
futures-util = "0.3.0"
futures = "0.3.0"
[package.metadata.docs.rs]
all-features = true

View File

@ -42,6 +42,7 @@
//! use tokio::prelude::*;
//! use tokio_util::codec::{Framed, LengthDelimitedCodec};
//!
//! use futures::SinkExt;
//! use bytes::Bytes;
//!
//! async fn write_frame<T>(io: T) -> Result<(), Box<dyn std::error::Error>>

View File

@ -5,6 +5,7 @@ use tokio_test::assert_ok;
use tokio_util::codec::{Decoder, Encoder, Framed, FramedParts};
use bytes::{Buf, BufMut, BytesMut, IntoBuf};
use futures::StreamExt;
use std::io::{self, Read};
use std::pin::Pin;
use std::task::{Context, Poll};

View File

@ -1,11 +1,12 @@
#![warn(rust_2018_idioms)]
use tokio::prelude::*;
use tokio::io::AsyncRead;
use tokio_test::assert_ready;
use tokio_test::task;
use tokio_util::codec::{Decoder, FramedRead};
use bytes::{Buf, BytesMut, IntoBuf};
use futures::Stream;
use std::collections::VecDeque;
use std::io;
use std::pin::Pin;

View File

@ -1,7 +1,6 @@
#![warn(rust_2018_idioms)]
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::prelude::*;
use tokio_test::task;
use tokio_test::{
assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok,
@ -9,7 +8,7 @@ use tokio_test::{
use tokio_util::codec::*;
use bytes::{BufMut, Bytes, BytesMut};
use futures_util::pin_mut;
use futures::{pin_mut, Sink, Stream};
use std::collections::VecDeque;
use std::io;
use std::pin::Pin;

View File

@ -3,10 +3,10 @@ use tokio_util::codec::{Decoder, Encoder};
use tokio_util::udp::UdpFramed;
use bytes::{BufMut, BytesMut};
use futures_util::future::try_join;
use futures_util::future::FutureExt;
use futures_util::sink::SinkExt;
use futures_util::stream::StreamExt;
use futures::future::try_join;
use futures::future::FutureExt;
use futures::sink::SinkExt;
use futures::stream::StreamExt;
use std::io;
#[tokio::test]

View File

@ -32,6 +32,7 @@ default = [
"process",
"rt-full",
"signal",
"stream",
"sync",
"time",
]
@ -40,7 +41,7 @@ blocking = ["rt-core"]
dns = ["blocking"]
fs = ["blocking"]
io-driver = ["mio", "lazy_static", "sync"] # TODO: get rid of sync
io-util = ["pin-project", "memchr"]
io-util = ["pin-project", "pin-project-lite", "memchr"]
macros = ["tokio-macros"]
net = ["dns", "tcp", "udp", "uds"]
process = [
@ -55,6 +56,7 @@ process = [
]
# Includes basic task execution capabilities
rt-core = []
# TODO: rename this -> `rt-threaded`
rt-full = [
"macros",
"num_cpus",
@ -72,6 +74,7 @@ signal = [
"winapi/consoleapi",
"winapi/minwindef",
]
stream = ["futures-core"]
sync = ["fnv"]
test-util = []
tcp = ["io-driver"]
@ -84,18 +87,17 @@ uds = ["io-driver", "mio-uds", "libc"]
tokio-macros = { version = "=0.2.0-alpha.6", optional = true, path = "../tokio-macros" }
bytes = "0.4"
futures-core = "0.3.0"
futures-sink = "0.3.0"
futures-util = { version = "0.3.0", features = ["sink", "channel"] }
iovec = "0.1"
# Everything else is optional...
fnv = { version = "1.0.6", optional = true }
futures-core = { version = "0.3.0", optional = true }
lazy_static = { version = "1.0.2", optional = true }
memchr = { version = "2.2", optional = true }
mio = { version = "0.6.14", optional = true }
num_cpus = { version = "1.8.0", optional = true }
pin-project = { version = "0.4", optional = true }
pin-project-lite = { version = "0.1", optional = true }
# Backs `DelayQueue`
slab = { version = "0.4.1", optional = true }

View File

@ -1,7 +1,6 @@
use crate::fs::sys;
use crate::io::{AsyncRead, AsyncWrite};
use futures_core::ready;
use std::cmp;
use std::future::Future;
use std::io;

View File

@ -7,7 +7,6 @@ use crate::fs::blocking::Buf;
use crate::fs::{asyncify, sys};
use crate::io::{AsyncRead, AsyncWrite};
use futures_core::ready;
use std::fmt;
use std::fs::{Metadata, Permissions};
use std::future::Future;
@ -430,7 +429,7 @@ impl File {
}
async fn complete_inflight(&mut self) {
use futures_util::future::poll_fn;
use crate::future::poll_fn;
if let Err(e) = poll_fn(|cx| Pin::new(&mut *self).poll_flush(cx)).await {
self.last_write_err = Some(e.kind());

View File

@ -1,7 +1,5 @@
use crate::fs::{asyncify, sys};
use futures_core::ready;
use futures_core::stream::Stream;
use std::ffi::OsString;
use std::fs::{FileType, Metadata};
use std::future::Future;
@ -50,10 +48,15 @@ enum State {
Pending(sys::Blocking<(Option<io::Result<std::fs::DirEntry>>, std::fs::ReadDir)>),
}
impl Stream for ReadDir {
type Item = io::Result<DirEntry>;
impl ReadDir {
/// Returns the next entry in the directory stream.
pub async fn next_entry(&mut self) -> io::Result<Option<DirEntry>> {
use crate::future::poll_fn;
poll_fn(|cx| self.poll_next_entry(cx)).await
}
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
#[doc(hidden)]
pub fn poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<Option<DirEntry>>> {
loop {
match self.0 {
State::Idle(ref mut std) => {
@ -68,7 +71,11 @@ impl Stream for ReadDir {
let (ret, std) = ready!(Pin::new(rx).poll(cx))?;
self.0 = State::Idle(Some(std));
let ret = ret.map(|res| res.map(|std| DirEntry(Arc::new(std))));
let ret = match ret {
Some(Ok(std)) => Ok(Some(DirEntry(Arc::new(std)))),
Some(Err(e)) => Err(e),
None => Ok(None),
};
return Poll::Ready(ret);
}
@ -77,6 +84,19 @@ impl Stream for ReadDir {
}
}
#[cfg(feature = "stream")]
impl futures_core::Stream for ReadDir {
type Item = io::Result<DirEntry>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(match ready!(self.poll_next_entry(cx)) {
Ok(Some(entry)) => Some(Ok(entry)),
Ok(None) => None,
Err(err) => Some(Err(err)),
})
}
}
/// Entries returned by the [`ReadDir`] stream.
///
/// [`ReadDir`]: struct.ReadDir.html
@ -100,13 +120,11 @@ impl DirEntry {
///
/// ```no_run
/// use tokio::fs;
/// use tokio::prelude::*;
///
/// # async fn dox() -> std::io::Result<()> {
/// let mut entries = fs::read_dir(".").await?;
///
/// while let Some(res) = entries.next().await {
/// let entry = res?;
/// while let Some(entry) = entries.next_entry().await? {
/// println!("{:?}", entry.path());
/// }
/// # Ok(())
@ -133,13 +151,11 @@ impl DirEntry {
///
/// ```
/// use tokio::fs;
/// use tokio::prelude::*;
///
/// # async fn dox() -> std::io::Result<()> {
/// let mut entries = fs::read_dir(".").await?;
///
/// while let Some(res) = entries.next().await {
/// let entry = res?;
/// while let Some(entry) = entries.next_entry().await? {
/// println!("{:?}", entry.file_name());
/// }
/// # Ok(())
@ -164,14 +180,11 @@ impl DirEntry {
///
/// ```
/// use tokio::fs;
/// use tokio::prelude::*;
///
/// # async fn dox() -> std::io::Result<()> {
/// let mut entries = fs::read_dir(".").await?;
///
/// while let Some(res) = entries.next().await {
/// let entry = res?;
///
/// while let Some(entry) = entries.next_entry().await? {
/// if let Ok(metadata) = entry.metadata().await {
/// // Now let's show our entry's permissions!
/// println!("{:?}: {:?}", entry.path(), metadata.permissions());
@ -202,14 +215,11 @@ impl DirEntry {
///
/// ```
/// use tokio::fs;
/// use tokio::prelude::*;
///
/// # async fn dox() -> std::io::Result<()> {
/// let mut entries = fs::read_dir(".").await?;
///
/// while let Some(res) = entries.next().await {
/// let entry = res?;
///
/// while let Some(entry) = entries.next_entry().await? {
/// if let Ok(file_type) = entry.file_type().await {
/// // Now let's show our entry's file type!
/// println!("{:?}: {:?}", entry.path(), file_type);

View File

@ -1,69 +0,0 @@
//! Asynchronous values.
#[cfg(feature = "time")]
use crate::time::Timeout;
#[cfg(feature = "time")]
use std::time::Duration;
#[doc(inline)]
pub use futures_util::future::{err, ok, pending, poll_fn, ready};
#[doc(inline)]
pub use std::future::Future;
/// An extension trait for `Future` that provides a variety of convenient
/// combinator functions.
///
/// Currently, there only is a [`timeout`] function, but this will increase
/// over time.
///
/// Users are not expected to implement this trait. All types that implement
/// `Future` already implement `FutureExt`.
///
/// This trait can be imported directly or via the Tokio prelude: `use
/// tokio::prelude::*`.
///
/// [`timeout`]: #method.timeout
pub trait FutureExt: Future {
/// Creates a new future which allows `self` until `timeout`.
///
/// This combinator creates a new future which wraps the receiving future
/// with a timeout. The returned future is allowed to execute until it
/// completes or `timeout` has elapsed, whichever happens first.
///
/// If the future completes before `timeout` then the future will resolve
/// with that item. Otherwise the future will resolve to an error.
///
/// The future is guaranteed to be polled at least once, even if `timeout`
/// is set to zero.
///
/// # Examples
///
/// ```
/// use tokio::prelude::*;
/// use std::time::Duration;
///
/// async fn long_future() {
/// // do work here
/// }
///
/// # async fn dox() {
/// let res = long_future()
/// .timeout(Duration::from_secs(1))
/// .await;
///
/// if res.is_err() {
/// println!("operation timed out");
/// }
/// # }
/// ```
#[cfg(feature = "time")]
fn timeout(self, timeout: Duration) -> Timeout<Self>
where
Self: Sized,
{
Timeout::new(self, timeout)
}
}
impl<T: ?Sized> FutureExt for T where T: Future {}

View File

@ -0,0 +1,76 @@
//! Definition of the MaybeDone combinator
use std::future::Future;
use std::mem;
use std::pin::Pin;
use std::task::{Context, Poll};
/// A future that may have completed.
#[derive(Debug)]
pub(crate) enum MaybeDone<Fut: Future> {
/// A not-yet-completed future
Future(Fut),
/// The output of the completed future
Done(Fut::Output),
/// The empty variant after the result of a [`MaybeDone`] has been
/// taken using the [`take_output`](MaybeDone::take_output) method.
Gone,
}
// Safe because we never generate `Pin<&mut Fut::Output>`
impl<Fut: Future + Unpin> Unpin for MaybeDone<Fut> {}
/// Wraps a future into a `MaybeDone`
pub(crate) fn maybe_done<Fut: Future>(future: Fut) -> MaybeDone<Fut> {
MaybeDone::Future(future)
}
impl<Fut: Future> MaybeDone<Fut> {
/// Returns an [`Option`] containing a mutable reference to the output of the future.
/// The output of this method will be [`Some`] if and only if the inner
/// future has been completed and [`take_output`](MaybeDone::take_output)
/// has not yet been called.
pub(crate) fn output_mut(self: Pin<&mut Self>) -> Option<&mut Fut::Output> {
unsafe {
let this = self.get_unchecked_mut();
match this {
MaybeDone::Done(res) => Some(res),
_ => None,
}
}
}
/// Attempt to take the output of a `MaybeDone` without driving it
/// towards completion.
#[inline]
pub(crate) fn take_output(self: Pin<&mut Self>) -> Option<Fut::Output> {
unsafe {
let this = self.get_unchecked_mut();
match this {
MaybeDone::Done(_) => {}
MaybeDone::Future(_) | MaybeDone::Gone => return None,
};
if let MaybeDone::Done(output) = mem::replace(this, MaybeDone::Gone) {
Some(output)
} else {
unreachable!()
}
}
}
}
impl<Fut: Future> Future for MaybeDone<Fut> {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let res = unsafe {
match self.as_mut().get_unchecked_mut() {
MaybeDone::Future(a) => ready!(Pin::new_unchecked(a).poll(cx)),
MaybeDone::Done(_) => return Poll::Ready(()),
MaybeDone::Gone => panic!("MaybeDone polled after value taken"),
}
};
self.set(MaybeDone::Done(res));
Poll::Ready(())
}
}

15
tokio/src/future/mod.rs Normal file
View File

@ -0,0 +1,15 @@
#![allow(unused_imports, dead_code)]
//! Asynchronous values.
mod maybe_done;
pub(crate) use maybe_done::{maybe_done, MaybeDone};
mod poll_fn;
pub(crate) use poll_fn::poll_fn;
mod ready;
pub(crate) use ready::{ok, Ready};
mod try_join;
pub(crate) use try_join::try_join3;

View File

@ -0,0 +1,44 @@
use std::future::Future;
use std::marker;
use sdt::pin::Pin;
use std::task::{Context, Poll};
/// Future for the [`pending()`] function.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
struct Pending<T> {
_data: marker::PhantomData<T>,
}
/// Creates a future which never resolves, representing a computation that never
/// finishes.
///
/// The returned future will forever return [`Poll::Pending`].
///
/// # Examples
///
/// ```no_run
/// use tokio::future;
///
/// #[tokio::main]
/// async fn main {
/// future::pending().await;
/// unreachable!();
/// }
/// ```
pub async fn pending() -> ! {
Pending {
_data: marker::PhantomData,
}.await
}
impl<T> Future for Pending<T> {
type Output = !;
fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<T> {
Poll::Pending
}
}
impl<T> Unpin for Pending<T> {
}

View File

@ -0,0 +1,38 @@
//! Definition of the `PollFn` adapter combinator
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
/// Future for the [`poll_fn`] function.
pub(crate) struct PollFn<F> {
f: F,
}
impl<F> Unpin for PollFn<F> {}
/// Creates a new future wrapping around a function returning [`Poll`].
pub(crate) fn poll_fn<T, F>(f: F) -> PollFn<F>
where
F: FnMut(&mut Context<'_>) -> Poll<T>,
{
PollFn { f }
}
impl<F> fmt::Debug for PollFn<F> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PollFn").finish()
}
}
impl<T, F> Future for PollFn<F>
where
F: FnMut(&mut Context<'_>) -> Poll<T>,
{
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
(&mut self.f)(cx)
}
}

27
tokio/src/future/ready.rs Normal file
View File

@ -0,0 +1,27 @@
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
/// Future for the [`ready`](ready()) function.
///
/// `pub` in order to use the future as an associated type in a sealed trait.
#[derive(Debug)]
// Used as an associated type in a "sealed" trait.
#[allow(unreachable_pub)]
pub struct Ready<T>(Option<T>);
impl<T> Unpin for Ready<T> {}
impl<T> Future for Ready<T> {
type Output = T;
#[inline]
fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<T> {
Poll::Ready(self.0.take().unwrap())
}
}
/// Create a future that is immediately ready with a success value.
pub(crate) fn ok<T, E>(t: T) -> Ready<Result<T, E>> {
Ready(Some(Ok(t)))
}

View File

@ -0,0 +1,115 @@
use crate::future::{maybe_done, MaybeDone};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
pub(crate) fn try_join3<T1, F1, T2, F2, T3, F3, E>(
future1: F1,
future2: F2,
future3: F3,
) -> TryJoin3<F1, F2, F3>
where
F1: Future<Output = Result<T1, E>>,
F2: Future<Output = Result<T2, E>>,
F3: Future<Output = Result<T3, E>>,
{
TryJoin3 {
future1: maybe_done(future1),
future2: maybe_done(future2),
future3: maybe_done(future3),
}
}
pub(crate) struct TryJoin3<F1, F2, F3>
where
F1: Future,
F2: Future,
F3: Future,
{
future1: MaybeDone<F1>,
future2: MaybeDone<F2>,
future3: MaybeDone<F3>,
}
impl<T1, F1, T2, F2, T3, F3, E> Future for TryJoin3<F1, F2, F3>
where
F1: Future<Output = Result<T1, E>>,
F2: Future<Output = Result<T2, E>>,
F3: Future<Output = Result<T3, E>>,
{
type Output = Result<(T1, T2, T3), E>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut all_done = true;
// Safety: the fn takes `Pin`, we don't move any data out of `self`.
unsafe {
let me = self.get_unchecked_mut();
if Pin::new_unchecked(&mut me.future1).poll(cx).is_pending() {
all_done = false;
} else if Pin::new_unchecked(&mut me.future1)
.output_mut()
.unwrap()
.is_err()
{
return Poll::Ready(Err(Pin::new_unchecked(&mut me.future1)
.take_output()
.unwrap()
.err()
.unwrap()));
}
if Pin::new_unchecked(&mut me.future2).poll(cx).is_pending() {
all_done = false;
} else if Pin::new_unchecked(&mut me.future2)
.output_mut()
.unwrap()
.is_err()
{
return Poll::Ready(Err(Pin::new_unchecked(&mut me.future2)
.take_output()
.unwrap()
.err()
.unwrap()));
}
if Pin::new_unchecked(&mut me.future3).poll(cx).is_pending() {
all_done = false;
} else if Pin::new_unchecked(&mut me.future3)
.output_mut()
.unwrap()
.is_err()
{
return Poll::Ready(Err(Pin::new_unchecked(&mut me.future3)
.take_output()
.unwrap()
.err()
.unwrap()));
}
if all_done {
Poll::Ready(Ok((
Pin::new_unchecked(&mut me.future1)
.take_output()
.unwrap()
.ok()
.unwrap(),
Pin::new_unchecked(&mut me.future2)
.take_output()
.unwrap()
.ok()
.unwrap(),
Pin::new_unchecked(&mut me.future3)
.take_output()
.unwrap()
.ok()
.unwrap(),
)))
} else {
Poll::Pending
}
}
}
}

View File

@ -1,5 +1,4 @@
use bytes::BufMut;
use futures_core::ready;
use std::io;
use std::ops::DerefMut;
use std::pin::Pin;

View File

@ -1,5 +1,4 @@
use bytes::Buf;
use futures_core::ready;
use std::io;
use std::ops::DerefMut;
use std::pin::Pin;

View File

@ -1,70 +0,0 @@
use crate::io::io::read_line::read_line_internal;
use crate::io::AsyncBufRead;
use futures_core::{ready, Stream};
use pin_project::{pin_project, project};
use std::io;
use std::mem;
use std::pin::Pin;
use std::task::{Context, Poll};
/// Stream for the [`lines`](crate::io::AsyncBufReadExt::lines) method.
#[pin_project]
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Lines<R> {
#[pin]
reader: R,
buf: String,
bytes: Vec<u8>,
read: usize,
}
pub(crate) fn lines<R>(reader: R) -> Lines<R>
where
R: AsyncBufRead,
{
Lines {
reader,
buf: String::new(),
bytes: Vec::new(),
read: 0,
}
}
impl<R: AsyncBufRead> Stream for Lines<R> {
type Item = io::Result<String>;
#[project]
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
#[project]
let Lines {
reader,
buf,
bytes,
read,
} = self.project();
let n = ready!(read_line_internal(reader, cx, buf, bytes, read))?;
if n == 0 && buf.is_empty() {
return Poll::Ready(None);
}
if buf.ends_with('\n') {
buf.pop();
if buf.ends_with('\r') {
buf.pop();
}
}
Poll::Ready(Some(Ok(mem::replace(buf, String::new()))))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn assert_unpin() {
crate::is_unpin::<Lines<()>>();
}
}

View File

@ -1,67 +0,0 @@
use crate::io::io::read_until::read_until_internal;
use crate::io::AsyncBufRead;
use futures_core::{ready, Stream};
use pin_project::{pin_project, project};
use std::io;
use std::mem;
use std::pin::Pin;
use std::task::{Context, Poll};
/// Stream for the [`split`](crate::io::AsyncBufReadExt::split) method.
#[pin_project]
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Split<R> {
#[pin]
reader: R,
buf: Vec<u8>,
delim: u8,
read: usize,
}
pub(crate) fn split<R>(reader: R, delim: u8) -> Split<R>
where
R: AsyncBufRead,
{
Split {
reader,
buf: Vec::new(),
delim,
read: 0,
}
}
impl<R: AsyncBufRead> Stream for Split<R> {
type Item = io::Result<Vec<u8>>;
#[project]
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
#[project]
let Split {
reader,
buf,
delim,
read,
} = self.project();
let n = ready!(read_until_internal(reader, cx, *delim, buf, read))?;
if n == 0 && buf.is_empty() {
return Poll::Ready(None);
}
if buf.last() == Some(&delim) {
buf.pop();
}
Poll::Ready(Some(Ok(mem::replace(buf, Vec::new()))))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn assert_unpin() {
crate::is_unpin::<Split<()>>();
}
}

View File

@ -45,20 +45,19 @@ pub use self::async_read::AsyncRead;
mod async_write;
pub use self::async_write::AsyncWrite;
#[allow(clippy::module_inception)] // TODO: remove
#[cfg(feature = "io-util")]
mod io;
#[cfg(feature = "io-util")]
pub use self::io::{
copy, empty, repeat, sink, AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufStream,
BufWriter, Copy, Empty, Repeat, Sink, Take,
};
#[cfg(feature = "io-util")]
pub mod split;
#[cfg(feature = "io-util")]
pub use self::split::split;
#[cfg(feature = "io-util")]
mod util;
#[cfg(feature = "io-util")]
pub use self::util::{
copy, empty, repeat, sink, AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufStream,
BufWriter, Copy, Empty, Lines, Repeat, Sink, Split, Take,
};
// TODO: These should not be guarded by `fs`
#[cfg(feature = "fs")]

View File

@ -7,7 +7,6 @@
use crate::io::{AsyncRead, AsyncWrite};
use bytes::{Buf, BufMut};
use futures_core::ready;
use std::cell::UnsafeCell;
use std::fmt;
use std::io;

View File

@ -1,7 +1,7 @@
use crate::io::io::lines::{lines, Lines};
use crate::io::io::read_line::{read_line, ReadLine};
use crate::io::io::read_until::{read_until, ReadUntil};
use crate::io::io::split::{split, Split};
use crate::io::util::lines::{lines, Lines};
use crate::io::util::read_line::{read_line, ReadLine};
use crate::io::util::read_until::{read_until, ReadUntil};
use crate::io::util::split::{split, Split};
use crate::io::AsyncBufRead;
/// An extension trait which adds utility methods to `AsyncBufRead` types.
@ -59,7 +59,7 @@ pub trait AsyncBufReadExt: AsyncBufRead {
/// Returns a stream of the contents of this reader split on the byte
/// `byte`.
///
/// This method is the async equivalent to
/// This method is the asynchronous equivalent to
/// [`BufRead::split`](std::io::BufRead::split).
///
/// The stream returned from this function will yield instances of
@ -73,9 +73,25 @@ pub trait AsyncBufReadExt: AsyncBufRead {
///
/// Each item of the stream has the same error semantics as
/// [`AsyncBufReadExt::read_until`](AsyncBufReadExt::read_until).
///
/// # Examples
///
/// ```
/// # use tokio::io::AsyncBufRead;
/// use tokio::io::AsyncBufReadExt;
///
/// # async fn dox(my_buf_read: impl AsyncBufRead + Unpin) -> std::io::Result<()> {
/// let mut segments = my_buf_read.split(b'f');
///
/// while let Some(segment) = segments.next_segment().await? {
/// println!("length = {}", segment.len())
/// }
/// # Ok(())
/// # }
/// ```
fn split(self, byte: u8) -> Split<Self>
where
Self: Sized,
Self: Sized + Unpin,
{
split(self, byte)
}

View File

@ -1,10 +1,10 @@
use crate::io::io::chain::{chain, Chain};
use crate::io::io::copy::{copy, Copy};
use crate::io::io::read::{read, Read};
use crate::io::io::read_exact::{read_exact, ReadExact};
use crate::io::io::read_to_end::{read_to_end, ReadToEnd};
use crate::io::io::read_to_string::{read_to_string, ReadToString};
use crate::io::io::take::{take, Take};
use crate::io::util::chain::{chain, Chain};
use crate::io::util::copy::{copy, Copy};
use crate::io::util::read::{read, Read};
use crate::io::util::read_exact::{read_exact, ReadExact};
use crate::io::util::read_to_end::{read_to_end, ReadToEnd};
use crate::io::util::read_to_string::{read_to_string, ReadToString};
use crate::io::util::take::{take, Take};
use crate::io::{AsyncRead, AsyncWrite};
/// An extension trait which adds utility methods to `AsyncRead` types.

View File

@ -1,7 +1,7 @@
use crate::io::io::flush::{flush, Flush};
use crate::io::io::shutdown::{shutdown, Shutdown};
use crate::io::io::write::{write, Write};
use crate::io::io::write_all::{write_all, WriteAll};
use crate::io::util::flush::{flush, Flush};
use crate::io::util::shutdown::{shutdown, Shutdown};
use crate::io::util::write::{write, Write};
use crate::io::util::write_all::{write_all, WriteAll};
use crate::io::AsyncWrite;
/// An extension trait which adds utility methods to `AsyncWrite` types.

View File

@ -1,7 +1,6 @@
use crate::io::io::DEFAULT_BUF_SIZE;
use crate::io::util::DEFAULT_BUF_SIZE;
use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite};
use futures_core::ready;
use pin_project::{pin_project, project};
use std::io::{self, Read};
use std::pin::Pin;

View File

@ -1,4 +1,4 @@
use crate::io::io::{BufReader, BufWriter};
use crate::io::util::{BufReader, BufWriter};
use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite};
use pin_project::pin_project;

View File

@ -1,7 +1,6 @@
use crate::io::io::DEFAULT_BUF_SIZE;
use crate::io::util::DEFAULT_BUF_SIZE;
use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite};
use futures_core::ready;
use pin_project::{pin_project, project};
use std::fmt;
use std::io::{self, Write};

View File

@ -1,6 +1,5 @@
use crate::io::{AsyncBufRead, AsyncRead};
use futures_core::ready;
use pin_project::{pin_project, project};
use std::fmt;
use std::io;

View File

@ -1,6 +1,5 @@
use crate::io::{AsyncRead, AsyncWrite};
use futures_core::ready;
use std::future::Future;
use std::io;
use std::pin::Pin;

113
tokio/src/io/util/lines.rs Normal file
View File

@ -0,0 +1,113 @@
use crate::io::util::read_line::read_line_internal;
use crate::io::AsyncBufRead;
use pin_project_lite::pin_project;
use std::io;
use std::mem;
use std::pin::Pin;
use std::task::{Context, Poll};
pin_project! {
/// Stream for the [`lines`](crate::io::AsyncBufReadExt::lines) method.
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Lines<R> {
#[pin]
reader: R,
buf: String,
bytes: Vec<u8>,
read: usize,
}
}
pub(crate) fn lines<R>(reader: R) -> Lines<R>
where
R: AsyncBufRead,
{
Lines {
reader,
buf: String::new(),
bytes: Vec::new(),
read: 0,
}
}
impl<R> Lines<R>
where
R: AsyncBufRead + Unpin,
{
/// Returns the next line in the stream.
///
/// # Examples
///
/// ```
/// # use tokio::io::AsyncBufRead;
/// use tokio::io::AsyncBufReadExt;
///
/// # async fn dox(my_buf_read: impl AsyncBufRead + Unpin) -> std::io::Result<()> {
/// let mut lines = my_buf_read.lines();
///
/// while let Some(line) = lines.next_line().await? {
/// println!("length = {}", line.len())
/// }
/// # Ok(())
/// # }
/// ```
pub async fn next_line(&mut self) -> io::Result<Option<String>> {
use crate::future::poll_fn;
poll_fn(|cx| Pin::new(&mut *self).poll_next_line(cx)).await
}
}
impl<R> Lines<R>
where
R: AsyncBufRead,
{
#[doc(hidden)]
pub fn poll_next_line(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<Option<String>>> {
let me = self.project();
let n = ready!(read_line_internal(me.reader, cx, me.buf, me.bytes, me.read))?;
if n == 0 && me.buf.is_empty() {
return Poll::Ready(Ok(None));
}
if me.buf.ends_with('\n') {
me.buf.pop();
if me.buf.ends_with('\r') {
me.buf.pop();
}
}
Poll::Ready(Ok(Some(mem::replace(me.buf, String::new()))))
}
}
#[cfg(feature = "stream")]
impl<R: AsyncBufRead> futures_core::Stream for Lines<R> {
type Item = io::Result<String>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(match ready!(self.poll_next_line(cx)) {
Ok(Some(line)) => Some(Ok(line)),
Ok(None) => None,
Err(err) => Some(Err(err)),
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn assert_unpin() {
crate::is_unpin::<Lines<()>>();
}
}

View File

@ -1,51 +1,71 @@
mod async_buf_read_ext;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::async_buf_read_ext::AsyncBufReadExt;
mod async_read_ext;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::async_read_ext::AsyncReadExt;
mod async_write_ext;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::async_write_ext::AsyncWriteExt;
mod buf_reader;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::buf_reader::BufReader;
mod buf_stream;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::buf_stream::BufStream;
mod buf_writer;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::buf_writer::BufWriter;
mod chain;
mod copy;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::copy::{copy, Copy};
mod empty;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::empty::{empty, Empty};
mod flush;
mod lines;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::lines::Lines;
mod read;
mod read_exact;
mod read_line;
mod read_to_end;
mod read_to_string;
mod read_until;
mod repeat;
mod shutdown;
mod sink;
mod split;
mod take;
mod write;
mod write_all;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::async_buf_read_ext::AsyncBufReadExt;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::async_read_ext::AsyncReadExt;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::async_write_ext::AsyncWriteExt;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::buf_reader::BufReader;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::buf_stream::BufStream;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::buf_writer::BufWriter;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::copy::{copy, Copy};
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::empty::{empty, Empty};
mod repeat;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::repeat::{repeat, Repeat};
mod shutdown;
mod sink;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::sink::{sink, Sink};
mod split;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::split::Split;
mod take;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::take::Take;
mod write;
mod write_all;
// used by `BufReader` and `BufWriter`
// https://github.com/rust-lang/rust/blob/master/src/libstd/sys_common/io.rs#L1
const DEFAULT_BUF_SIZE: usize = 8 * 1024;

View File

@ -1,6 +1,5 @@
use crate::io::AsyncRead;
use futures_core::ready;
use std::future::Future;
use std::io;
use std::marker::Unpin;

View File

@ -1,7 +1,6 @@
use crate::io::io::read_until::read_until_internal;
use crate::io::util::read_until::read_until_internal;
use crate::io::AsyncBufRead;
use futures_core::ready;
use std::future::Future;
use std::io;
use std::mem;

View File

@ -1,6 +1,5 @@
use crate::io::AsyncRead;
use futures_core::ready;
use std::future::Future;
use std::io;
use std::pin::Pin;

View File

@ -1,7 +1,6 @@
use crate::io::io::read_to_end::read_to_end_internal;
use crate::io::util::read_to_end::read_to_end_internal;
use crate::io::AsyncRead;
use futures_core::ready;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

View File

@ -1,6 +1,5 @@
use crate::io::AsyncBufRead;
use futures_core::ready;
use std::future::Future;
use std::io;
use std::mem;

111
tokio/src/io/util/split.rs Normal file
View File

@ -0,0 +1,111 @@
use crate::io::util::read_until::read_until_internal;
use crate::io::AsyncBufRead;
use pin_project_lite::pin_project;
use std::io;
use std::mem;
use std::pin::Pin;
use std::task::{Context, Poll};
pin_project! {
/// Stream for the [`split`](crate::io::AsyncBufReadExt::split) method.
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Split<R> {
#[pin]
reader: R,
buf: Vec<u8>,
delim: u8,
read: usize,
}
}
pub(crate) fn split<R>(reader: R, delim: u8) -> Split<R>
where
R: AsyncBufRead,
{
Split {
reader,
buf: Vec::new(),
delim,
read: 0,
}
}
impl<R> Split<R>
where
R: AsyncBufRead + Unpin,
{
/// Returns the next segment in the stream.
///
/// # Examples
///
/// ```
/// # use tokio::io::AsyncBufRead;
/// use tokio::io::AsyncBufReadExt;
///
/// # async fn dox(my_buf_read: impl AsyncBufRead + Unpin) -> std::io::Result<()> {
/// let mut segments = my_buf_read.split(b'f');
///
/// while let Some(segment) = segments.next_segment().await? {
/// println!("length = {}", segment.len())
/// }
/// # Ok(())
/// # }
/// ```
pub async fn next_segment(&mut self) -> io::Result<Option<Vec<u8>>> {
use crate::future::poll_fn;
poll_fn(|cx| Pin::new(&mut *self).poll_next_segment(cx)).await
}
}
impl<R> Split<R>
where
R: AsyncBufRead,
{
#[doc(hidden)]
pub fn poll_next_segment(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<Option<Vec<u8>>>> {
let me = self.project();
let n = ready!(read_until_internal(
me.reader, cx, *me.delim, me.buf, me.read,
))?;
if n == 0 && me.buf.is_empty() {
return Poll::Ready(Ok(None));
}
if me.buf.last() == Some(me.delim) {
me.buf.pop();
}
Poll::Ready(Ok(Some(mem::replace(me.buf, Vec::new()))))
}
}
#[cfg(feature = "stream")]
impl<R: AsyncBufRead> futures_core::Stream for Split<R> {
type Item = io::Result<Vec<u8>>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(match ready!(self.poll_next_segment(cx)) {
Ok(Some(segment)) => Some(Ok(segment)),
Ok(None) => None,
Err(err) => Some(Err(err)),
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn assert_unpin() {
crate::is_unpin::<Split<()>>();
}
}

View File

@ -1,6 +1,5 @@
use crate::io::{AsyncBufRead, AsyncRead};
use futures_core::ready;
use pin_project::{pin_project, project};
use std::pin::Pin;
use std::task::{Context, Poll};

View File

@ -1,6 +1,5 @@
use crate::io::AsyncWrite;
use futures_core::ready;
use std::future::Future;
use std::io;
use std::mem;

View File

@ -75,6 +75,15 @@ macro_rules! thread_local {
($($tts:tt)+) => { loom::thread_local!{ $($tts)+ } }
}
macro_rules! ready {
($e:expr $(,)?) => {
match $e {
std::task::Poll::Ready(t) => t,
std::task::Poll::Pending => return std::task::Poll::Pending,
}
};
}
// At the top due to macros
#[cfg(test)]
#[macro_use]
@ -86,7 +95,7 @@ pub mod blocking;
#[cfg(feature = "fs")]
pub mod fs;
pub mod future;
mod future;
pub mod io;
@ -107,8 +116,6 @@ pub mod runtime;
#[cfg(not(loom))]
pub mod signal;
pub mod stream;
#[cfg(feature = "sync")]
pub mod sync;

View File

@ -1,4 +1,5 @@
use futures_util::future;
use crate::future;
use std::io;
use std::net::{IpAddr, SocketAddr};
#[cfg(feature = "dns")]
@ -27,7 +28,7 @@ impl sealed::ToSocketAddrsPriv for SocketAddr {
fn to_socket_addrs(&self) -> Self::Future {
let iter = Some(*self).into_iter();
future::ready(Ok(iter))
future::ok(iter)
}
}
@ -111,7 +112,7 @@ impl sealed::ToSocketAddrsPriv for (IpAddr, u16) {
fn to_socket_addrs(&self) -> Self::Future {
let iter = Some(SocketAddr::from(*self)).into_iter();
future::ready(Ok(iter))
future::ok(iter)
}
}
@ -195,8 +196,6 @@ pub(crate) mod sealed {
type Output = io::Result<OneOrMore>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
use futures_core::ready;
match *self {
MaybeReady::Ready(ref mut i) => {
let iter = OneOrMore::One(i.take().into_iter());

View File

@ -1,8 +1,5 @@
use crate::net::tcp::TcpListener;
use crate::net::tcp::TcpStream;
use crate::net::tcp::{TcpListener, TcpStream};
use futures_core::ready;
use futures_core::stream::Stream;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
@ -11,17 +8,27 @@ use std::task::{Context, Poll};
/// stream of sockets received from a listener.
#[must_use = "streams do nothing unless polled"]
#[derive(Debug)]
pub struct Incoming {
inner: TcpListener,
pub struct Incoming<'a> {
inner: &'a mut TcpListener,
}
impl Incoming {
pub(crate) fn new(listener: TcpListener) -> Incoming {
impl Incoming<'_> {
pub(crate) fn new(listener: &mut TcpListener) -> Incoming<'_> {
Incoming { inner: listener }
}
#[doc(hidden)] // TODO: dox
pub fn poll_accept(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<TcpStream>> {
let (socket, _) = ready!(self.inner.poll_accept(cx))?;
Poll::Ready(Ok(socket))
}
}
impl Stream for Incoming {
#[cfg(feature = "stream")]
impl futures_core::Stream for Incoming<'_> {
type Item = io::Result<TcpStream>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {

View File

@ -1,9 +1,8 @@
use crate::future::poll_fn;
use crate::net::tcp::{Incoming, TcpStream};
use crate::net::util::PollEvented;
use crate::net::ToSocketAddrs;
use futures_core::ready;
use futures_util::future::poll_fn;
use std::convert::TryFrom;
use std::fmt;
use std::io;
@ -12,9 +11,6 @@ use std::task::{Context, Poll};
/// An I/O object representing a TCP socket listening for incoming connections.
///
/// This object can be converted into a stream of incoming connections for
/// various forms of processing.
///
/// # Examples
///
/// ```no_run
@ -126,7 +122,8 @@ impl TcpListener {
poll_fn(|cx| self.poll_accept(cx)).await
}
pub(crate) fn poll_accept(
#[doc(hidden)] // TODO: document
pub fn poll_accept(
&mut self,
cx: &mut Context<'_>,
) -> Poll<io::Result<(TcpStream, SocketAddr)>> {
@ -242,7 +239,7 @@ impl TcpListener {
/// necessarily fatal for example having too many open file descriptors or the other side
/// closing the connection while it waits in an accept queue. These would terminate the stream
/// if not handled in any way.
pub fn incoming(self) -> Incoming {
pub fn incoming(&mut self) -> Incoming<'_> {
Incoming::new(self)
}

View File

@ -14,15 +14,13 @@
//! [`TcpStream`]: struct.TcpStream.html
//! [`connect`]: struct.TcpStream.html#method.connect
//! [`TcpListener`]: struct.TcpListener.html
//! [incoming_method]: struct.TcpListener.html#method.incoming
//! [`Incoming`]: struct.Incoming.html
mod incoming;
pub use self::incoming::Incoming;
mod listener;
pub use self::listener::TcpListener;
mod incoming;
pub use self::incoming::Incoming;
pub mod split;
mod stream;

View File

@ -1,11 +1,10 @@
use crate::future::poll_fn;
use crate::io::{AsyncRead, AsyncWrite};
use crate::net::tcp::split::{split, ReadHalf, WriteHalf};
use crate::net::util::PollEvented;
use crate::net::ToSocketAddrs;
use bytes::{Buf, BufMut};
use futures_core::ready;
use futures_util::future::poll_fn;
use iovec::IoVec;
use std::convert::TryFrom;
use std::fmt;

View File

@ -1,9 +1,8 @@
use crate::future::poll_fn;
use crate::net::udp::split::{split, UdpSocketRecvHalf, UdpSocketSendHalf};
use crate::net::util::PollEvented;
use crate::net::ToSocketAddrs;
use futures_core::ready;
use futures_util::future::poll_fn;
use std::convert::TryFrom;
use std::fmt;
use std::io;

View File

@ -12,9 +12,9 @@
//! The halves can be reunited to the original socket with their `reunite`
//! methods.
use super::UdpSocket;
use crate::future::poll_fn;
use crate::net::udp::UdpSocket;
use futures_util::future::poll_fn;
use std::error::Error;
use std::fmt;
use std::io;

View File

@ -1,7 +1,6 @@
use crate::future::poll_fn;
use crate::net::util::PollEvented;
use futures_core::ready;
use futures_util::future::poll_fn;
use std::convert::TryFrom;
use std::fmt;
use std::io;

View File

@ -1,7 +1,5 @@
use super::{UnixListener, UnixStream};
use crate::net::unix::{UnixListener, UnixStream};
use futures_core::ready;
use futures_core::stream::Stream;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
@ -9,21 +7,31 @@ use std::task::{Context, Poll};
/// Stream of listeners
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Incoming {
inner: UnixListener,
pub struct Incoming<'a> {
inner: &'a mut UnixListener,
}
impl Incoming {
pub(crate) fn new(listener: UnixListener) -> Incoming {
impl Incoming<'_> {
pub(crate) fn new(listener: &mut UnixListener) -> Incoming<'_> {
Incoming { inner: listener }
}
#[doc(hidden)] // TODO: dox
pub fn poll_accept(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<UnixStream>> {
let (socket, _) = ready!(self.inner.poll_accept(cx))?;
Poll::Ready(Ok(socket))
}
}
impl Stream for Incoming {
#[cfg(feature = "stream")]
impl futures_core::Stream for Incoming<'_> {
type Item = io::Result<UnixStream>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let (socket, _) = ready!(Pin::new(&mut self.inner).poll_accept(cx))?;
let (socket, _) = ready!(self.inner.poll_accept(cx))?;
Poll::Ready(Some(Ok(socket)))
}
}

View File

@ -1,8 +1,7 @@
use crate::net::unix::UnixStream;
use crate::future::poll_fn;
use crate::net::unix::{Incoming, UnixStream};
use crate::net::util::PollEvented;
use futures_core::ready;
use futures_util::future::poll_fn;
use mio::Ready;
use mio_uds;
use std::convert::TryFrom;
@ -90,8 +89,8 @@ impl UnixListener {
///
/// This method returns an implementation of the `Stream` trait which
/// resolves to the sockets the are accepted on this listener.
pub fn incoming(self) -> super::Incoming {
super::Incoming::new(self)
pub fn incoming(&mut self) -> Incoming<'_> {
Incoming::new(self)
}
}

View File

@ -1,11 +1,10 @@
use crate::future::poll_fn;
use crate::io::{AsyncRead, AsyncWrite};
use crate::net::unix::split::{split, ReadHalf, WriteHalf};
use crate::net::unix::ucred::{self, UCred};
use crate::net::util::PollEvented;
use bytes::{Buf, BufMut};
use futures_core::ready;
use futures_util::future::poll_fn;
use iovec::IoVec;
use std::convert::TryFrom;
use std::fmt;

View File

@ -1,7 +1,6 @@
use crate::io::{AsyncRead, AsyncWrite};
use crate::net::driver::{platform, Registration};
use futures_core::ready;
use mio::event::Evented;
use std::fmt;
use std::io::{self, Read, Write};
@ -55,7 +54,7 @@ use std::task::{Context, Poll};
/// ```rust
/// use tokio::net::util::PollEvented;
///
/// use futures_core::ready;
/// use futures::ready;
/// use mio::Ready;
/// use mio::net::{TcpStream, TcpListener};
/// use std::io;

View File

@ -11,21 +11,6 @@
//!
//! The prelude may grow over time as additional items see ubiquitous use.
#[doc(no_inline)]
pub use crate::future::FutureExt as _;
#[doc(no_inline)]
pub use futures_util::future::FutureExt as _;
pub use std::future::Future;
pub use crate::stream::Stream;
#[doc(no_inline)]
pub use crate::stream::StreamExt as _;
pub use futures_sink::Sink;
#[doc(no_inline)]
pub use futures_util::sink::SinkExt as _;
#[doc(no_inline)]
pub use futures_util::stream::StreamExt as _;
pub use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite};
#[cfg(feature = "io-util")]
#[doc(no_inline)]

View File

@ -58,7 +58,6 @@
//! use tokio::io::{BufReader, AsyncBufReadExt};
//! use tokio::process::Command;
//!
//! use futures_util::stream::StreamExt;
//! use std::process::Stdio;
//!
//! #[tokio::main]
@ -89,8 +88,8 @@
//! println!("child status was: {}", status);
//! });
//!
//! while let Some(line) = reader.next().await {
//! println!("Line: {}", line?);
//! while let Some(line) = reader.next_line().await? {
//! println!("Line: {}", line);
//! }
//!
//! Ok(())
@ -120,8 +119,6 @@ mod kill;
use crate::io::{AsyncRead, AsyncReadExt, AsyncWrite};
use crate::process::kill::Kill;
use futures_core::TryFuture;
use futures_util::future::try_join3;
use std::ffi::OsStr;
use std::future::Future;
use std::io;
@ -681,11 +678,14 @@ impl<T: Kill> Drop for ChildDropGuard<T> {
}
}
impl<T: TryFuture + Kill + Unpin> Future for ChildDropGuard<T> {
type Output = Result<T::Ok, T::Error>;
impl<T, E, F> Future for ChildDropGuard<F>
where
F: Future<Output = Result<T, E>> + Kill + Unpin,
{
type Output = Result<T, E>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let ret = Pin::new(&mut self.inner).try_poll(cx);
let ret = Pin::new(&mut self.inner).poll(cx);
if let Poll::Ready(Ok(_)) = ret {
// Avoid the overhead of trying to kill a reaped process
@ -766,6 +766,8 @@ impl Child {
/// new pipes between parent and child. Use `stdout(Stdio::piped())` or
/// `stderr(Stdio::piped())`, respectively, when creating a `Command`.
pub async fn wait_with_output(mut self) -> io::Result<Output> {
use crate::future::try_join3;
async fn read_to_end<A: AsyncRead + Unpin>(io: Option<A>) -> io::Result<Vec<u8>> {
let mut vec = Vec::new();
if let Some(mut io) = io {
@ -940,16 +942,14 @@ mod sys {
#[cfg(all(test, not(loom)))]
mod test {
use super::kill::Kill;
use super::ChildDropGuard;
use futures::future::FutureExt;
use std::future::Future;
use std::io;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use futures_util::future::FutureExt;
use super::kill::Kill;
use super::ChildDropGuard;
use std::task::{Context, Poll};
struct Mock {
num_kills: usize,
@ -1021,7 +1021,7 @@ mod test {
let mut mock_reaped = Mock::with_result(Poll::Ready(Ok(())));
let mut mock_err = Mock::with_result(Poll::Ready(Err(())));
let waker = futures_util::task::noop_waker();
let waker = futures::task::noop_waker();
let mut context = Context::from_waker(&waker);
{
let mut guard = ChildDropGuard::new(&mut mock_pending);

View File

@ -22,14 +22,16 @@
//! bad in theory...
mod orphan;
mod reap;
use orphan::{OrphanQueue, OrphanQueueImpl, Wait};
mod reap;
use reap::Reaper;
use self::orphan::{OrphanQueue, OrphanQueueImpl, Wait};
use self::reap::Reaper;
use super::SpawnedChild;
use crate::net::util::PollEvented;
use crate::process::kill::Kill;
use crate::process::SpawnedChild;
use crate::signal::unix::{signal, Signal, SignalKind};
use mio::event::Evented;
use mio::unix::{EventedFd, UnixReady};
use mio::{Poll as MioPoll, PollOpt, Ready, Token};
@ -38,11 +40,11 @@ use std::future::Future;
use std::io;
use std::os::unix::io::{AsRawFd, RawFd};
use std::pin::Pin;
use std::process::{self, ExitStatus};
use std::process::ExitStatus;
use std::task::Context;
use std::task::Poll;
impl Wait for process::Child {
impl Wait for std::process::Child {
fn id(&self) -> u32 {
self.id()
}
@ -52,14 +54,14 @@ impl Wait for process::Child {
}
}
impl Kill for process::Child {
impl Kill for std::process::Child {
fn kill(&mut self) -> io::Result<()> {
self.kill()
}
}
lazy_static::lazy_static! {
static ref ORPHAN_QUEUE: OrphanQueueImpl<process::Child> = OrphanQueueImpl::new();
static ref ORPHAN_QUEUE: OrphanQueueImpl<std::process::Child> = OrphanQueueImpl::new();
}
struct GlobalOrphanQueue;
@ -70,8 +72,8 @@ impl fmt::Debug for GlobalOrphanQueue {
}
}
impl OrphanQueue<process::Child> for GlobalOrphanQueue {
fn push_orphan(&self, orphan: process::Child) {
impl OrphanQueue<std::process::Child> for GlobalOrphanQueue {
fn push_orphan(&self, orphan: std::process::Child) {
ORPHAN_QUEUE.push_orphan(orphan)
}
@ -82,7 +84,7 @@ impl OrphanQueue<process::Child> for GlobalOrphanQueue {
#[must_use = "futures do nothing unless polled"]
pub(crate) struct Child {
inner: Reaper<process::Child, GlobalOrphanQueue, Signal>,
inner: Reaper<std::process::Child, GlobalOrphanQueue, Signal>,
}
impl fmt::Debug for Child {
@ -93,7 +95,7 @@ impl fmt::Debug for Child {
}
}
pub(crate) fn spawn_child(cmd: &mut process::Command) -> io::Result<SpawnedChild> {
pub(crate) fn spawn_child(cmd: &mut std::process::Command) -> io::Result<SpawnedChild> {
let mut child = cmd.spawn()?;
let stdin = stdio(child.stdin.take())?;
let stdout = stdio(child.stdout.take())?;
@ -196,9 +198,9 @@ where
}
}
pub(crate) type ChildStdin = PollEvented<Fd<process::ChildStdin>>;
pub(crate) type ChildStdout = PollEvented<Fd<process::ChildStdout>>;
pub(crate) type ChildStderr = PollEvented<Fd<process::ChildStderr>>;
pub(crate) type ChildStdin = PollEvented<Fd<std::process::ChildStdin>>;
pub(crate) type ChildStdout = PollEvented<Fd<std::process::ChildStdout>>;
pub(crate) type ChildStderr = PollEvented<Fd<std::process::ChildStderr>>;
fn stdio<T>(option: Option<T>) -> io::Result<Option<PollEvented<Fd<T>>>>
where

View File

@ -1,6 +1,7 @@
use super::orphan::{OrphanQueue, Wait};
use crate::process::imp::orphan::{OrphanQueue, Wait};
use crate::process::kill::Kill;
use futures_core::stream::Stream;
use crate::signal::unix::Signal;
use std::future::Future;
use std::io;
use std::ops::Deref;
@ -22,6 +23,17 @@ where
signal: S,
}
// Work around removal of `futures_core` dependency
pub(crate) trait Stream: Unpin {
fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>>;
}
impl Stream for Signal {
fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
Signal::poll_recv(self, cx)
}
}
impl<W, Q, S> Deref for Reaper<W, Q, S>
where
W: Wait + Unpin,
@ -60,7 +72,7 @@ impl<W, Q, S> Future for Reaper<W, Q, S>
where
W: Wait + Unpin,
Q: OrphanQueue<W> + Unpin,
S: Stream + Unpin,
S: Stream,
{
type Output = io::Result<ExitStatus>;
@ -85,7 +97,7 @@ where
// this future's task will be notified/woken up again. Since the
// futures model allows for spurious wake ups this extra wakeup
// should not cause significant issues with parent futures.
let registered_interest = Pin::new(&mut self.signal).poll_next(cx).is_pending();
let registered_interest = self.signal.poll_recv(cx).is_pending();
self.orphan_queue.reap_orphans();
if let Some(status) = self.inner_mut().try_wait()? {
@ -134,11 +146,10 @@ where
#[cfg(all(test, not(loom)))]
mod test {
use super::*;
use futures_core::stream::Stream;
use futures_util::future::FutureExt;
use futures::future::FutureExt;
use std::cell::{Cell, RefCell};
use std::os::unix::process::ExitStatusExt;
use std::pin::Pin;
use std::process::ExitStatus;
use std::task::Context;
use std::task::Poll;
@ -201,13 +212,10 @@ mod test {
}
impl Stream for MockStream {
type Item = io::Result<()>;
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let inner = Pin::get_mut(self);
inner.total_polls += 1;
match inner.values.remove(0) {
Some(()) => Poll::Ready(Some(Ok(()))),
fn poll_recv(&mut self, _cx: &mut Context<'_>) -> Poll<Option<()>> {
self.total_polls += 1;
match self.values.remove(0) {
Some(()) => Poll::Ready(Some(())),
None => Poll::Pending,
}
}
@ -247,7 +255,7 @@ mod test {
MockStream::new(vec![None, Some(()), None, None, None]),
);
let waker = futures_util::task::noop_waker();
let waker = futures::task::noop_waker();
let mut context = Context::from_waker(&waker);
// Not yet exited, interest registered

View File

@ -20,8 +20,6 @@ use crate::process::kill::Kill;
use crate::process::SpawnedChild;
use crate::sync::oneshot;
use futures_util::future::Fuse;
use futures_util::future::FutureExt;
use mio_named_pipes::NamedPipe;
use std::fmt;
use std::future::Future;
@ -59,7 +57,7 @@ impl fmt::Debug for Child {
}
struct Waiting {
rx: Fuse<oneshot::Receiver<()>>,
rx: oneshot::Receiver<()>,
wait_object: HANDLE,
tx: *mut Option<oneshot::Sender<()>>,
}
@ -103,7 +101,7 @@ impl Future for Child {
let inner = Pin::get_mut(self);
loop {
if let Some(ref mut w) = inner.waiting {
match w.rx.poll_unpin(cx) {
match Pin::new(&mut w.rx).poll(cx) {
Poll::Ready(Ok(())) => {}
Poll::Ready(Err(_)) => panic!("should not be canceled"),
Poll::Pending => return Poll::Pending,
@ -134,7 +132,7 @@ impl Future for Child {
return Poll::Ready(Err(err));
}
inner.waiting = Some(Waiting {
rx: rx.fuse(),
rx,
wait_object,
tx: ptr,
});

View File

@ -334,6 +334,14 @@ impl Runtime {
})
}
/// Enter the runtime context
pub fn enter<F, R>(&self, f: F) -> R
where
F: FnOnce() -> R,
{
self.handle.enter(f)
}
/// Return a handle to the runtime's spawner.
///
/// The returned handle can be used to spawn tasks that run on this runtime.

View File

@ -4,7 +4,7 @@ use crate::blocking;
use crate::runtime::thread_pool::ThreadPool;
use crate::runtime::{Park, Unpark};
use futures_util::future::poll_fn;
use futures::future::poll_fn;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::Ordering::Relaxed;

View File

@ -246,7 +246,7 @@ fn val(num: u32) -> Task<Noop> {
}
fn num(task: Task<Noop>) -> u32 {
use futures_util::task::noop_waker_ref;
use futures::task::noop_waker_ref;
use std::future::Future;
use std::pin::Pin;
use std::task::Context;

View File

@ -1,46 +1,35 @@
#[cfg(unix)]
use super::unix::{self as os_impl, Signal as Inner};
use super::unix::{self as os_impl};
#[cfg(windows)]
use super::windows::{self as os_impl, Event as Inner};
use super::windows::{self as os_impl};
use futures_core::stream::Stream;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
/// Represents a stream which receives "ctrl-c" notifications sent to the process.
/// Completes when a "ctrl-c" notification is sent to the process.
///
/// In general signals are handled very differently across Unix and Windows, but
/// this is somewhat cross platform in terms of how it can be handled. A ctrl-c
/// event to a console process can be represented as a stream for both Windows
/// and Unix.
/// While signals are handled very differently between Unix and Windows, both
/// platforms support receiving a signal on "ctrl-c". This function provides a
/// portable API for receiving this notification.
///
/// Note that there are a number of caveats listening for signals, and you may
/// wish to read up on the documentation in the `unix` or `windows` module to
/// take a peek.
/// Once the returned future is polled, a listener a listener is registered. The
/// future will complete on the first received `ctrl-c` **after** the initial
/// call to either `Future::poll` or `.await`.
///
/// Notably, a notification to this process notifies *all* streams listening to
/// this event. Moreover, the notifications **are coalesced** if they aren't processed
/// quickly enough. This means that if two notifications are received back-to-back,
/// then the stream may only receive one item about the two notifications.
#[must_use = "streams do nothing unless polled"]
#[derive(Debug)]
pub struct CtrlC {
inner: Inner,
}
/// Creates a new stream which receives "ctrl-c" notifications sent to the
/// process.
///
/// This function binds to the default reactor.
pub fn ctrl_c() -> io::Result<CtrlC> {
os_impl::ctrl_c().map(|inner| CtrlC { inner })
}
impl Stream for CtrlC {
type Item = ();
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.inner).poll_next(cx)
}
/// # Examples
///
/// ```rust,no_run
/// use tokio::signal;
///
/// #[tokio::main]
/// async fn main() {
/// println!("waiting for ctrl-c");
///
/// signal::ctrl_c().await.expect("failed to listen for event");
///
/// println!("received ctrl-c event");
/// }
/// ```
pub async fn ctrl_c() -> io::Result<()> {
os_impl::ctrl_c()?.recv().await;
Ok(())
}

View File

@ -14,28 +14,15 @@
//!
//! # Examples
//!
//! Print out all ctrl-C notifications received
//! Print on "ctrl-c" notification.
//!
//! ```rust,no_run
//! use tokio::signal;
//!
//! use futures_util::future;
//! use futures_util::stream::StreamExt;
//!
//! #[tokio::main]
//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
//! // Create an infinite stream of "Ctrl+C" notifications. Each item received
//! // on this stream may represent multiple ctrl-c signals.
//! let ctrl_c = signal::ctrl_c()?;
//!
//! // Process each ctrl-c as it comes in
//! let prog = ctrl_c.for_each(|_| {
//! println!("ctrl-c received!");
//! future::ready(())
//! });
//!
//! prog.await;
//!
//! signal::ctrl_c().await?;
//! println!("ctrl-c received!");
//! Ok(())
//! }
//! ```
@ -45,38 +32,25 @@
//! ```rust,no_run
//! # #[cfg(unix)] {
//!
//! use tokio::signal::{self, unix::{signal, SignalKind}};
//!
//! use futures_util::future;
//! use futures_util::stream::StreamExt;
//! use tokio::signal::unix::{signal, SignalKind};
//!
//! #[tokio::main]
//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
//! // Create an infinite stream of "Ctrl+C" notifications. Each item received
//! // on this stream may represent multiple ctrl-c signals.
//! let ctrl_c = signal::ctrl_c()?;
//! // An infinite stream of hangup signals.
//! let mut stream = signal(SignalKind::hangup())?;
//!
//! // Process each ctrl-c as it comes in
//! let prog = ctrl_c.for_each(|_| {
//! println!("ctrl-c received!");
//! future::ready(())
//! });
//!
//! prog.await;
//!
//! // Like the previous example, this is an infinite stream of signals
//! // being received, and signals may be coalesced while pending.
//! let stream = signal(SignalKind::hangup())?;
//!
//! // Convert out stream into a future and block the program
//! let (signal, _stream) = stream.into_future().await;
//! println!("got signal {:?}", signal);
//! Ok(())
//! // Print whenever a HUP signal is received
//! loop {
//! stream.recv().await;
//! println!("got signal HUP");
//! }
//! }
//! # }
//! ```
mod ctrl_c;
pub use ctrl_c::ctrl_c;
mod registry;
mod os {
@ -89,5 +63,3 @@ mod os {
pub mod unix;
pub mod windows;
pub use self::ctrl_c::{ctrl_c, CtrlC};

View File

@ -87,6 +87,8 @@ impl<S: Storage> Registry<S> {
///
/// Returns true if an event was delivered to at least one listener.
fn broadcast(&self) -> bool {
use crate::sync::mpsc::error::TrySendError;
let mut did_notify = false;
self.storage.for_each(|event_info| {
// Any signal of this kind arrived since we checked last?
@ -103,17 +105,13 @@ impl<S: Storage> Registry<S> {
for i in (0..recipients.len()).rev() {
match recipients[i].try_send(()) {
Ok(()) => did_notify = true,
Err(ref e) if e.is_closed() => {
Err(TrySendError::Closed(..)) => {
recipients.swap_remove(i);
}
// Channel is full, ignore the error since the
// receiver has already been woken up
Err(e) => {
// Sanity check in case this error type ever gets
// additional variants we have not considered.
debug_assert!(e.is_full());
}
Err(_) => {}
}
}
});
@ -180,7 +178,8 @@ mod tests {
use super::*;
use crate::runtime::{self, Runtime};
use crate::sync::{mpsc, oneshot};
use futures::{future, StreamExt};
use futures::future;
#[test]
fn smoke() {
@ -220,11 +219,7 @@ mod tests {
});
let _ = fire.send(());
let all = future::join3(
first_rx.collect::<Vec<_>>(),
second_rx.collect::<Vec<_>>(),
third_rx.collect::<Vec<_>>(),
);
let all = future::join3(collect(first_rx), collect(second_rx), collect(third_rx));
let (first_results, second_results, third_results) = all.await;
assert_eq!(2, first_results.len());
@ -279,7 +274,7 @@ mod tests {
});
let _ = fire.send(());
let results: Vec<()> = third_rx.collect().await;
let results = collect(third_rx).await;
assert_eq!(1, results.len());
});
@ -311,4 +306,14 @@ mod tests {
fn rt() -> Runtime {
runtime::Builder::new().current_thread().build().unwrap()
}
async fn collect(mut rx: crate::sync::mpsc::Receiver<()>) -> Vec<()> {
let mut ret = vec![];
while let Some(v) = rx.recv().await {
ret.push(v);
}
ret
}
}

View File

@ -10,10 +10,8 @@ use crate::net::util::PollEvented;
use crate::signal::registry::{globals, EventId, EventInfo, Globals, Init, Storage};
use crate::sync::mpsc::{channel, Receiver};
use futures_core::stream::Stream;
use libc::c_int;
use mio_uds::UnixStream;
use std::future::Future;
use std::io::{self, Error, ErrorKind, Write};
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
@ -262,10 +260,8 @@ struct Driver {
wakeup: PollEvented<UnixStream>,
}
impl Future for Driver {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
impl Driver {
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<()> {
// Drain the data from the pipe and maintain interest in getting more
self.drain(cx);
// Broadcast any signals which were received
@ -302,7 +298,7 @@ impl Driver {
/// We do *NOT* use the existence of any read bytes as evidence a sigal was
/// received since the `pending` flags would have already been set if that
/// was the case. See #38 for more info.
fn drain(mut self: Pin<&mut Self>, cx: &mut Context<'_>) {
fn drain(&mut self, cx: &mut Context<'_>) {
loop {
match Pin::new(&mut self.wakeup).poll_read(cx, &mut [0; 128]) {
Poll::Ready(Ok(0)) => panic!("EOF on self-pipe"),
@ -395,20 +391,24 @@ pub fn signal(kind: SignalKind) -> io::Result<Signal> {
Ok(Signal { driver, rx })
}
pub(crate) fn ctrl_c() -> io::Result<Signal> {
signal(SignalKind::interrupt())
}
impl Stream for Signal {
type Item = ();
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let _ = Pin::new(&mut self.driver).poll(cx);
impl Signal {
#[doc(hidden)] // TODO: Dox
pub async fn recv(&mut self) -> Option<()> {
use crate::future::poll_fn;
poll_fn(|cx| self.poll_recv(cx)).await
}
#[doc(hidden)] // TODO: document
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
let _ = self.driver.poll(cx);
self.rx.poll_recv(cx)
}
}
pub(crate) fn ctrl_c() -> io::Result<Signal> {
signal(SignalKind::interrupt())
}
#[cfg(all(test, not(loom)))]
mod tests {
use super::*;

View File

@ -10,10 +10,8 @@
use crate::signal::registry::{globals, EventId, EventInfo, Init, Storage};
use crate::sync::mpsc::{channel, Receiver};
use futures_core::stream::Stream;
use std::convert::TryFrom;
use std::io;
use std::pin::Pin;
use std::sync::Once;
use std::task::{Context, Poll};
use winapi::shared::minwindef::*;
@ -82,6 +80,10 @@ pub(crate) struct Event {
rx: Receiver<()>,
}
pub(crate) fn ctrl_c() -> io::Result<Event> {
Event::new(CTRL_C_EVENT)
}
impl Event {
fn new(signum: DWORD) -> io::Result<Self> {
global_init()?;
@ -91,17 +93,10 @@ impl Event {
Ok(Event { rx })
}
}
pub(crate) fn ctrl_c() -> io::Result<Event> {
Event::new(CTRL_C_EVENT)
}
impl Stream for Event {
type Item = ();
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.rx.poll_recv(cx)
pub(crate) async fn recv(&mut self) -> Option<()> {
use crate::future::poll_fn;
poll_fn(|cx| self.rx.poll_recv(cx)).await
}
}
@ -109,6 +104,7 @@ fn global_init() -> io::Result<()> {
static INIT: Once = Once::new();
let mut init = None;
INIT.call_once(|| unsafe {
let rc = SetConsoleCtrlHandler(Some(handler), TRUE);
let ret = if rc == 0 {
@ -153,6 +149,22 @@ pub struct CtrlBreak {
inner: Event,
}
impl CtrlBreak {
#[doc(hidden)] // TODO: document
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
self.inner.rx.poll_recv(cx)
}
}
#[cfg(feature = "stream")]
impl futures_core::Stream for CtrlBreak {
type Item = ();
fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>> {
self.poll_recv(cx)
}
}
/// Creates a new stream which receives "ctrl-break" notifications sent to the
/// process.
///
@ -161,29 +173,22 @@ pub fn ctrl_break() -> io::Result<CtrlBreak> {
Event::new(CTRL_BREAK_EVENT).map(|inner| CtrlBreak { inner })
}
impl Stream for CtrlBreak {
type Item = ();
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.inner)
.poll_next(cx)
.map(|item| item.map(|_| ()))
}
}
#[cfg(all(test, not(loom)))]
mod tests {
use super::*;
use crate::runtime::Runtime;
use tokio_test::{assert_ok, assert_pending, assert_ready_ok, task};
use futures_util::stream::StreamExt;
use futures::stream::StreamExt;
#[test]
fn ctrl_c() {
let mut rt = rt();
let rt = rt();
rt.block_on(async {
let ctrl_c = crate::signal::ctrl_c().expect("failed to create CtrlC");
rt.enter(|| {
let mut ctrl_c = task::spawn(crate::signal::ctrl_c());
assert_pending!(ctrl_c.poll());
// Windows doesn't have a good programmatic way of sending events
// like sending signals on Unix, so we'll stub out the actual OS
@ -192,7 +197,7 @@ mod tests {
super::handler(CTRL_C_EVENT);
}
let _ = ctrl_c.into_future().await;
assert_ready_ok!(ctrl_c.poll());
});
}
@ -201,7 +206,7 @@ mod tests {
let mut rt = rt();
rt.block_on(async {
let ctrl_break = super::ctrl_break().expect("failed to create CtrlC");
let mut ctrl_break = assert_ok!(super::ctrl_break());
// Windows doesn't have a good programmatic way of sending events
// like sending signals on Unix, so we'll stub out the actual OS
@ -210,7 +215,7 @@ mod tests {
super::handler(CTRL_BREAK_EVENT);
}
let _ = ctrl_break.into_future().await;
ctrl_break.next().await.unwrap();
});
}

View File

@ -1,78 +0,0 @@
//! A sequence of asynchronous values.
#[cfg(feature = "time")]
use std::time::Duration;
#[cfg(feature = "time")]
use crate::time::{throttle::Throttle, Timeout};
#[doc(inline)]
pub use futures_core::Stream;
#[doc(inline)]
pub use futures_util::stream::{empty, iter, once, pending, poll_fn, repeat, unfold};
/// An extension trait for `Stream` that provides a variety of convenient
/// combinator functions.
///
/// Currently, there are only [`timeout`] and [`throttle`] functions, but
/// this will increase over time.
///
/// Users are not expected to implement this trait. All types that implement
/// `Stream` already implement `StreamExt`.
///
/// This trait can be imported directly or via the Tokio prelude: `use
/// tokio::prelude::*`.
///
/// [`throttle`]: method.throttle
/// [`timeout`]: method.timeout
pub trait StreamExt: Stream {
/// Throttle down the stream by enforcing a fixed delay between items.
///
/// Errors are also delayed.
#[cfg(feature = "time")]
fn throttle(self, duration: Duration) -> Throttle<Self>
where
Self: Sized,
{
Throttle::new(self, duration)
}
/// Creates a new stream which allows `self` until `timeout`.
///
/// This combinator creates a new stream which wraps the receiving stream
/// with a timeout. For each item, the returned stream is allowed to execute
/// until it completes or `timeout` has elapsed, whichever happens first.
///
/// If an item completes before `timeout` then the stream will yield
/// with that item. Otherwise the stream will yield to an error.
///
/// # Examples
///
/// ```
/// use tokio::prelude::*;
///
/// use std::time::Duration;
///
/// # fn slow_stream() -> impl Stream<Item = ()> {
/// # tokio::stream::empty()
/// # }
/// #
/// # async fn dox() {
/// let mut stream = slow_stream()
/// .timeout(Duration::from_secs(1));
///
/// while let Some(value) = stream.next().await {
/// println!("value = {:?}", value);
/// }
/// # }
/// ```
#[cfg(feature = "time")]
fn timeout(self, timeout: Duration) -> Timeout<Self>
where
Self: Sized,
{
Timeout::new(self, timeout)
}
}
impl<T: ?Sized> StreamExt for T where T: Stream {}

View File

@ -8,8 +8,9 @@ use std::sync::Mutex;
/// # #[tokio::main]
/// # async fn main() {
/// use tokio::sync::Barrier;
///
/// use futures::future::join_all;
/// use std::sync::Arc;
/// use futures_util::future::join_all;
///
/// let mut handles = Vec::with_capacity(10);
/// let barrier = Arc::new(Barrier::new(10));

View File

@ -1,8 +1,8 @@
use crate::sync::mpsc::chan;
use crate::sync::mpsc::error::{ClosedError, SendError, TrySendError};
use crate::sync::semaphore;
use std::fmt;
use std::pin::Pin;
use std::task::{Context, Poll};
/// Send values to the associated `Receiver`.
@ -44,27 +44,6 @@ impl<T> fmt::Debug for Receiver<T> {
}
}
/// Error returned by the `Sender`.
#[derive(Debug)]
pub struct SendError(());
/// Error returned by `Sender::try_send`.
#[derive(Debug)]
pub struct TrySendError<T> {
kind: ErrorKind,
value: T,
}
#[derive(Debug)]
enum ErrorKind {
Closed,
NoCapacity,
}
/// Error returned by `Receiver`.
#[derive(Debug)]
pub struct RecvError(());
/// Create a bounded mpsc channel for communicating between asynchronous tasks,
/// returning the sender/receiver halves.
///
@ -161,12 +140,12 @@ impl<T> Receiver<T> {
/// }
/// ```
pub async fn recv(&mut self) -> Option<T> {
use futures_util::future::poll_fn;
use crate::future::poll_fn;
poll_fn(|cx| self.poll_recv(cx)).await
}
#[doc(hidden)] // TODO: remove
#[doc(hidden)] // TODO: document
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.chan.recv(cx)
}
@ -180,11 +159,14 @@ impl<T> Receiver<T> {
}
}
impl<T> Unpin for Receiver<T> {}
#[cfg(feature = "stream")]
impl<T> futures_core::Stream for Receiver<T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.get_mut().poll_recv(cx)
fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.poll_recv(cx)
}
}
@ -193,9 +175,9 @@ impl<T> Sender<T> {
Sender { chan }
}
#[doc(hidden)] // TODO: remove
pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
self.chan.poll_ready(cx).map_err(|_| SendError(()))
#[doc(hidden)] // TODO: document
pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ClosedError>> {
self.chan.poll_ready(cx).map_err(|_| ClosedError::new())
}
/// Attempts to send a message on this `Sender`, returning the message
@ -233,105 +215,17 @@ impl<T> Sender<T> {
/// }
/// }
/// ```
pub async fn send(&mut self, value: T) -> Result<(), SendError> {
use futures_util::future::poll_fn;
pub async fn send(&mut self, value: T) -> Result<(), SendError<T>> {
use crate::future::poll_fn;
poll_fn(|cx| self.poll_ready(cx)).await?;
self.try_send(value).map_err(|_| SendError(()))
}
}
impl<T> futures_sink::Sink<T> for Sender<T> {
type Error = SendError;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Sender::poll_ready(self.get_mut(), cx)
}
fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> {
self.as_mut().try_send(msg).map_err(|err| {
assert!(err.is_full(), "call `poll_ready` before sending");
SendError(())
})
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
}
// ===== impl SendError =====
impl fmt::Display for SendError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "channel closed")
}
}
impl ::std::error::Error for SendError {}
// ===== impl TrySendError =====
impl<T> TrySendError<T> {
/// Get the inner value.
pub fn into_inner(self) -> T {
self.value
}
/// Did the send fail because the channel has been closed?
pub fn is_closed(&self) -> bool {
if let ErrorKind::Closed = self.kind {
true
} else {
false
if poll_fn(|cx| self.poll_ready(cx)).await.is_err() {
return Err(SendError(value));
}
}
/// Did the send fail because the channel was at capacity?
pub fn is_full(&self) -> bool {
if let ErrorKind::NoCapacity = self.kind {
true
} else {
false
match self.try_send(value) {
Ok(()) => Ok(()),
Err(TrySendError::Full(_)) => unreachable!(),
Err(TrySendError::Closed(value)) => Err(SendError(value)),
}
}
}
impl<T: fmt::Debug> fmt::Display for TrySendError<T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
let descr = match self.kind {
ErrorKind::Closed => "channel closed",
ErrorKind::NoCapacity => "no available capacity",
};
write!(fmt, "{}", descr)
}
}
impl<T: fmt::Debug> ::std::error::Error for TrySendError<T> {}
impl<T> From<(T, chan::TrySendError)> for TrySendError<T> {
fn from((value, err): (T, chan::TrySendError)) -> TrySendError<T> {
TrySendError {
value,
kind: match err {
chan::TrySendError::Closed => ErrorKind::Closed,
chan::TrySendError::NoPermits => ErrorKind::NoCapacity,
},
}
}
}
// ===== impl RecvError =====
impl fmt::Display for RecvError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "channel closed")
}
}
impl ::std::error::Error for RecvError {}

View File

@ -1,5 +1,9 @@
use crate::loom::{cell::CausalCell, future::AtomicWaker, sync::atomic::AtomicUsize, sync::Arc};
use crate::sync::mpsc::list;
use crate::loom::cell::CausalCell;
use crate::loom::future::AtomicWaker;
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Arc;
use crate::sync::mpsc::error::ClosedError;
use crate::sync::mpsc::{error, list};
use std::fmt;
use std::process;
@ -43,7 +47,25 @@ where
#[derive(Debug, Eq, PartialEq)]
pub(crate) enum TrySendError {
Closed,
NoPermits,
Full,
}
impl<T> From<(T, TrySendError)> for error::SendError<T> {
fn from(src: (T, TrySendError)) -> error::SendError<T> {
match src.1 {
TrySendError::Closed => error::SendError(src.0),
TrySendError::Full => unreachable!(),
}
}
}
impl<T> From<(T, TrySendError)> for error::TrySendError<T> {
fn from(src: (T, TrySendError)) -> error::TrySendError<T> {
match src.1 {
TrySendError::Closed => error::TrySendError::Closed(src.0),
TrySendError::Full => error::TrySendError::Full(src.0),
}
}
}
pub(crate) trait Semaphore {
@ -59,8 +81,11 @@ pub(crate) trait Semaphore {
fn add_permit(&self);
fn poll_acquire(&self, cx: &mut Context<'_>, permit: &mut Self::Permit)
-> Poll<Result<(), ()>>;
fn poll_acquire(
&self,
cx: &mut Context<'_>,
permit: &mut Self::Permit,
) -> Poll<Result<(), ClosedError>>;
fn try_acquire(&self, permit: &mut Self::Permit) -> Result<(), TrySendError>;
@ -161,26 +186,19 @@ where
}
}
pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ()>> {
pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ClosedError>> {
self.inner.semaphore.poll_acquire(cx, &mut self.permit)
}
/// Send a message and notify the receiver.
pub(crate) fn try_send(&mut self, value: T) -> Result<(), (T, TrySendError)> {
if let Err(e) = self.inner.semaphore.try_acquire(&mut self.permit) {
return Err((value, e));
}
self.inner.try_send(value, &mut self.permit)
}
}
// Push the value
self.inner.tx.push(value);
// Notify the rx task
self.inner.rx_waker.wake();
// Release the permit
self.inner.semaphore.forget(&mut self.permit);
Ok(())
impl<T> Tx<T, AtomicUsize> {
pub(crate) fn send_unbounded(&self, value: T) -> Result<(), (T, TrySendError)> {
self.inner.try_send(value, &mut ())
}
}
@ -317,6 +335,28 @@ where
// ===== impl Chan =====
impl<T, S> Chan<T, S>
where
S: Semaphore,
{
fn try_send(&self, value: T, permit: &mut S::Permit) -> Result<(), (T, TrySendError)> {
if let Err(e) = self.semaphore.try_acquire(permit) {
return Err((value, e));
}
// Push the value
self.tx.push(value);
// Notify the rx task
self.rx_waker.wake();
// Release the permit
self.semaphore.forget(permit);
Ok(())
}
}
impl<T, S> Drop for Chan<T, S> {
fn drop(&mut self) {
use super::block::Read::Value;
@ -339,7 +379,7 @@ impl From<TryAcquireError> for TrySendError {
if src.is_closed() {
TrySendError::Closed
} else if src.is_no_permits() {
TrySendError::NoPermits
TrySendError::Full
} else {
unreachable!();
}
@ -369,8 +409,14 @@ impl Semaphore for (crate::sync::semaphore::Semaphore, usize) {
self.0.available_permits() == self.1
}
fn poll_acquire(&self, cx: &mut Context<'_>, permit: &mut Permit) -> Poll<Result<(), ()>> {
permit.poll_acquire(cx, &self.0).map_err(|_| ())
fn poll_acquire(
&self,
cx: &mut Context<'_>,
permit: &mut Permit,
) -> Poll<Result<(), ClosedError>> {
permit
.poll_acquire(cx, &self.0)
.map_err(|_| ClosedError::new())
}
fn try_acquire(&self, permit: &mut Permit) -> Result<(), TrySendError> {
@ -412,8 +458,12 @@ impl Semaphore for AtomicUsize {
self.load(Acquire) >> 1 == 0
}
fn poll_acquire(&self, _cx: &mut Context<'_>, permit: &mut ()) -> Poll<Result<(), ()>> {
Ready(self.try_acquire(permit).map_err(|_| ()))
fn poll_acquire(
&self,
_cx: &mut Context<'_>,
permit: &mut (),
) -> Poll<Result<(), ClosedError>> {
Ready(self.try_acquire(permit).map_err(|_| ClosedError::new()))
}
fn try_acquire(&self, _permit: &mut ()) -> Result<(), TrySendError> {

View File

@ -0,0 +1,86 @@
//! Channel error types
use std::error::Error;
use std::fmt;
/// Error returned by the `Sender`.
#[derive(Debug)]
pub struct SendError<T>(pub T);
impl<T> fmt::Display for SendError<T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "channel closed")
}
}
impl<T: fmt::Debug> ::std::error::Error for SendError<T> {}
// ===== TrySendError =====
/// This enumeration is the list of the possible error outcomes for the
/// [try_send](super::Sender::try_send) method.
#[derive(Debug)]
pub enum TrySendError<T> {
/// The data could not be sent on the channel because the channel is
/// currently full and sending would require blocking.
Full(T),
/// The receive half of the channel was explicitly closed or has been
/// dropped.
Closed(T),
}
impl<T: fmt::Debug> Error for TrySendError<T> {}
impl<T> fmt::Display for TrySendError<T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
fmt,
"{}",
match self {
TrySendError::Full(..) => "no available capacity",
TrySendError::Closed(..) => "channel closed",
}
)
}
}
impl<T> From<SendError<T>> for TrySendError<T> {
fn from(src: SendError<T>) -> TrySendError<T> {
TrySendError::Closed(src.0)
}
}
// ===== RecvError =====
/// Error returned by `Receiver`.
#[derive(Debug)]
pub struct RecvError(());
impl fmt::Display for RecvError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "channel closed")
}
}
impl Error for RecvError {}
// ===== ClosedError =====
/// Erorr returned by [`Sender::poll_ready`](super::Sender::poll_ready)].
#[derive(Debug)]
pub struct ClosedError(());
impl ClosedError {
pub(crate) fn new() -> ClosedError {
ClosedError(())
}
}
impl fmt::Display for ClosedError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "channel closed")
}
}
impl Error for ClosedError {}

View File

@ -46,12 +46,7 @@ pub(super) mod list;
mod unbounded;
pub use self::unbounded::{unbounded_channel, UnboundedReceiver, UnboundedSender};
pub mod error {
//! Channel error types
pub use super::bounded::{RecvError, SendError, TrySendError};
pub use super::unbounded::{UnboundedRecvError, UnboundedSendError, UnboundedTrySendError};
}
pub mod error;
/// The number of values a block can contain.
///

View File

@ -1,11 +1,10 @@
use crate::loom::sync::atomic::AtomicUsize;
use crate::sync::mpsc::chan;
use crate::sync::mpsc::error::SendError;
use std::fmt;
use std::task::{Context, Poll};
use std::pin::Pin;
/// Send values to the associated `UnboundedReceiver`.
///
/// Instances are created by the
@ -47,18 +46,6 @@ impl<T> fmt::Debug for UnboundedReceiver<T> {
}
}
/// Error returned by the `UnboundedSender`.
#[derive(Debug)]
pub struct UnboundedSendError(());
/// Returned by `UnboundedSender::try_send` when the channel has been closed.
#[derive(Debug)]
pub struct UnboundedTrySendError<T>(T);
/// Error returned by `UnboundedReceiver`.
#[derive(Debug)]
pub struct UnboundedRecvError(());
/// Create an unbounded mpsc channel for communicating between asynchronous
/// tasks.
///
@ -86,7 +73,7 @@ impl<T> UnboundedReceiver<T> {
UnboundedReceiver { chan }
}
#[doc(hidden)] // TODO: remove
#[doc(hidden)] // TODO: doc
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.chan.recv(cx)
}
@ -103,10 +90,10 @@ impl<T> UnboundedReceiver<T> {
///
/// #[tokio::main]
/// async fn main() {
/// let (mut tx, mut rx) = mpsc::unbounded_channel();
/// let (tx, mut rx) = mpsc::unbounded_channel();
///
/// tokio::spawn(async move {
/// tx.try_send("hello").unwrap();
/// tx.send("hello").unwrap();
/// });
///
/// assert_eq!(Some("hello"), rx.recv().await);
@ -121,17 +108,17 @@ impl<T> UnboundedReceiver<T> {
///
/// #[tokio::main]
/// async fn main() {
/// let (mut tx, mut rx) = mpsc::unbounded_channel();
/// let (tx, mut rx) = mpsc::unbounded_channel();
///
/// tx.try_send("hello").unwrap();
/// tx.try_send("world").unwrap();
/// tx.send("hello").unwrap();
/// tx.send("world").unwrap();
///
/// assert_eq!(Some("hello"), rx.recv().await);
/// assert_eq!(Some("world"), rx.recv().await);
/// }
/// ```
pub async fn recv(&mut self) -> Option<T> {
use futures_util::future::poll_fn;
use crate::future::poll_fn;
poll_fn(|cx| self.poll_recv(cx)).await
}
@ -145,11 +132,12 @@ impl<T> UnboundedReceiver<T> {
}
}
#[cfg(feature = "stream")]
impl<T> futures_core::Stream for UnboundedReceiver<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.chan.recv(cx)
fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.poll_recv(cx)
}
}
@ -159,72 +147,8 @@ impl<T> UnboundedSender<T> {
}
/// Attempts to send a message on this `UnboundedSender` without blocking.
pub fn try_send(&mut self, message: T) -> Result<(), UnboundedTrySendError<T>> {
self.chan.try_send(message)?;
pub fn send(&self, message: T) -> Result<(), SendError<T>> {
self.chan.send_unbounded(message)?;
Ok(())
}
}
impl<T> futures_sink::Sink<T> for UnboundedSender<T> {
type Error = UnboundedSendError;
fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> {
self.try_send(msg).map_err(|_| UnboundedSendError(()))
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
}
// ===== impl UnboundedSendError =====
impl fmt::Display for UnboundedSendError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "channel closed")
}
}
impl ::std::error::Error for UnboundedSendError {}
// ===== impl TrySendError =====
impl<T> UnboundedTrySendError<T> {
/// Get the inner value.
pub fn into_inner(self) -> T {
self.0
}
}
impl<T: fmt::Debug> fmt::Display for UnboundedTrySendError<T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "channel closed")
}
}
impl<T: fmt::Debug> ::std::error::Error for UnboundedTrySendError<T> {}
impl<T> From<(T, chan::TrySendError)> for UnboundedTrySendError<T> {
fn from((value, err): (T, chan::TrySendError)) -> UnboundedTrySendError<T> {
assert_eq!(chan::TrySendError::Closed, err);
UnboundedTrySendError(value)
}
}
// ===== impl UnboundedRecvError =====
impl fmt::Display for UnboundedRecvError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "channel closed")
}
}
impl ::std::error::Error for UnboundedRecvError {}

View File

@ -29,9 +29,9 @@
//! [`Mutex`]: struct.Mutex.html
//! [`MutexGuard`]: struct.MutexGuard.html
use crate::future::poll_fn;
use crate::sync::semaphore;
use futures_util::future::poll_fn;
use std::cell::UnsafeCell;
use std::fmt;
use std::ops::{Deref, DerefMut};

View File

@ -4,7 +4,6 @@ use crate::loom::cell::CausalCell;
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Arc;
use futures_core::ready;
use std::fmt;
use std::future::Future;
use std::mem::MaybeUninit;
@ -225,7 +224,7 @@ impl<T> Sender<T> {
/// }
/// ```
pub async fn closed(&mut self) {
use futures_util::future::poll_fn;
use crate::future::poll_fn;
poll_fn(|cx| self.poll_closed(cx)).await
}

View File

@ -1,6 +1,6 @@
use crate::sync::task::AtomicWaker;
use futures_util::future::poll_fn;
use futures::future::poll_fn;
use loom::future::block_on;
use loom::sync::atomic::AtomicUsize;
use loom::thread;

View File

@ -1,6 +1,6 @@
use crate::sync::mpsc;
use futures_util::future::poll_fn;
use futures::future::poll_fn;
use loom::future::block_on;
use loom::thread;

View File

@ -1,6 +1,6 @@
use crate::sync::oneshot;
use futures_util::future::poll_fn;
use futures::future::poll_fn;
use loom::future::block_on;
use loom::thread;
use std::task::Poll::{Pending, Ready};

View File

@ -1,7 +1,6 @@
use crate::sync::semaphore::*;
use futures_core::ready;
use futures_util::future::poll_fn;
use futures::future::poll_fn;
use loom::future::block_on;
use loom::thread;
use std::future::Future;

View File

@ -51,20 +51,16 @@
//! [`Sender::closed`]: struct.Sender.html#method.closed
//! [`Receiver::get_ref`]: struct.Receiver.html#method.get_ref
use crate::future::poll_fn;
use crate::sync::task::AtomicWaker;
use core::task::Poll::{Pending, Ready};
use core::task::{Context, Poll};
use fnv::FnvHashMap;
use futures_util::future::poll_fn;
use std::ops;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, Weak};
use futures_core::ready;
use futures_util::pin_mut;
use std::pin::Pin;
use std::task::Poll::{Pending, Ready};
use std::task::{Context, Poll};
/// Receives values from the associated [`Sender`](struct.Sender.html).
///
@ -235,77 +231,50 @@ impl<T> Receiver<T> {
Ref { inner }
}
/// Attempts to receive the latest value sent via the channel.
///
/// If a new, unobserved, value has been sent, a reference to it is
/// returned. If no new value has been sent, then `Pending` is returned and
/// the current task is notified once a new value is sent.
///
/// Only the **most recent** value is returned. If the receiver is falling
/// behind the sender, intermediate values are dropped.
pub async fn recv_ref(&mut self) -> Option<Ref<'_, T>> {
let shared = &self.shared;
let inner = &self.inner;
let version = self.ver;
// TODO: document
#[doc(hidden)]
pub fn poll_recv_ref<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<Option<Ref<'a, T>>> {
// Make sure the task is up to date
self.inner.waker.register_by_ref(cx.waker());
match poll_fn(|cx| poll_lock(cx, shared, inner, version)).await {
Some((lock, version)) => {
self.ver = version;
Some(lock)
}
None => None,
let state = self.shared.version.load(SeqCst);
let version = state & !CLOSED;
if version != self.ver {
let inner = self.shared.value.read().unwrap();
self.ver = version;
return Ready(Some(Ref { inner }));
}
if CLOSED == state & CLOSED {
// The `Store` handle has been dropped.
return Ready(None);
}
Pending
}
}
fn poll_lock<'a, T>(
cx: &mut Context<'_>,
shared: &'a Arc<Shared<T>>,
inner: &Arc<WatchInner>,
ver: usize,
) -> Poll<Option<(Ref<'a, T>, usize)>> {
// Make sure the task is up to date
inner.waker.register_by_ref(cx.waker());
let state = shared.version.load(SeqCst);
let version = state & !CLOSED;
if version != ver {
let inner = shared.value.read().unwrap();
return Ready(Some((Ref { inner }, version)));
}
if CLOSED == state & CLOSED {
// The `Store` handle has been dropped.
return Ready(None);
}
Pending
}
impl<T: Clone> Receiver<T> {
/// Attempts to clone the latest value sent via the channel.
///
/// This is equivalent to calling `clone()` on the value returned by
/// `recv_ref()`.
#[allow(clippy::map_clone)] // false positive: https://github.com/rust-lang/rust-clippy/issues/3274
pub async fn recv(&mut self) -> Option<T> {
self.recv_ref().await.map(|v_ref| v_ref.clone())
poll_fn(|cx| {
let v_ref = ready!(self.poll_recv_ref(cx));
Poll::Ready(v_ref.map(|v_ref| (*v_ref).clone()))
})
.await
}
}
#[cfg(feature = "stream")]
impl<T: Clone> futures_core::Stream for Receiver<T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
use std::future::Future;
fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
let v_ref = ready!(self.poll_recv_ref(cx));
let fut = self.get_mut().recv();
pin_mut!(fut);
let item = ready!(fut.poll(cx));
Ready(item.map(|v_ref| v_ref))
Poll::Ready(v_ref.map(|v_ref| (*v_ref).clone()))
}
}
@ -394,27 +363,6 @@ impl<T> Sender<T> {
}
}
impl<T> futures_sink::Sink<T> for Sender<T> {
type Error = error::SendError<T>;
fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Ready(Ok(()))
}
fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
self.as_ref().get_ref().broadcast(item)?;
Ok(())
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Ready(Ok(()))
}
}
/// Notify all watchers of a change
fn notify_all<T>(shared: &Shared<T>) {
let watchers = shared.watchers.lock().unwrap();

View File

@ -29,7 +29,7 @@ fn create_drop_join_handle() {
#[test]
fn poll_drop_handle_then_drop() {
use futures_util::future::poll_fn;
use futures::future::poll_fn;
use std::pin::Pin;
use std::task::Poll;
@ -196,7 +196,7 @@ fn shutdown_from_queue_after_poll() {
}
fn gated(n: usize, complete_first_poll: bool, by_val: bool) -> impl Future<Output = &'static str> {
use futures_util::future::poll_fn;
use futures::future::poll_fn;
use std::sync::Arc;
use std::task::Poll;
@ -255,7 +255,7 @@ fn join_one_task<T: Future + 'static>(join_handle: T) -> loom::thread::JoinHandl
fn join_two_tasks<T: Future + Unpin + 'static>(
join_handle: T,
) -> loom::thread::JoinHandle<T::Output> {
use futures_util::future::poll_fn;
use futures::future::poll_fn;
use std::task::Poll;
// Join handle

View File

@ -7,7 +7,7 @@ use crate::tests::track_drop::track_drop;
use tokio_test::task::spawn;
use tokio_test::{assert_pending, assert_ready_err, assert_ready_ok};
use futures_util::future::poll_fn;
use futures::future::poll_fn;
use std::sync::mpsc;
#[test]

View File

@ -4,10 +4,12 @@
//! `test-util` feature flag is enabled, the values returned for `now()` are
//! configurable.
#[cfg(feature = "test-util")]
pub(crate) use self::variant::now;
pub(crate) use self::variant::Clock;
#[cfg(feature = "test-util")]
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::variant::{advance, pause, resume};
pub(crate) use self::variant::{now, Clock};
#[cfg(not(feature = "test-util"))]
mod variant {

Some files were not shown because too many files have changed in this diff Show More