Migrate to pin project lite (#595)

* REMOVE ME updates peak_wema test to pass

* adds pin_project_lite dependency

* uses pin_project_lite for load::Constant

* uses pin_project_lite for load::PencingRequestsDiscover

* uses pin_project_lite for load::PeakEwma

* uses pin_project_lite for load::Completion

* uses pin_project_lite for tests::support::IntoStream

Turns IntoStream into a regular struct because pin_project_lite does not and will support tuple structs.

416be96f77/src/lib.rs (L401-L408)

* refactors opaque_future into a regular struct

This enables migration to pin_project_lite, which does not and will not support tuple structs
416be96f77/src/lib.rs (L401-L408)

* migrates opaque_future to use pin_project_lite

* removes tuple variant from load_shed::ResponseState enum

* migrates load_shed::future to pin_project_lite

* removes tuple variant from filter::future::State

* migrates filter::future to pin_project_lite

Note: the doc comment on AsyncResponseFuture::service was also reduced to a regular comment.

This is a known limitation of pin_project_lite that the they have labeled as "help wanted".
https://github.com/taiki-e/pin-project-lite/issues/3#issuecomment-745194112

* migrates retry::Retry to pin_project_lite

* refactors retry::future::State to enable pin_project_lite

pin_project_lite has the current limitation of nto supporting doc comments
https://github.com/taiki-e/pin-project-lite/issues/3#issuecomment-745194112

pin_project_lite does not and will not support tuple variants
416be96f77/src/lib.rs (L401-L408)

* migrates retry::future to pin_project_lite

* migrates spawn_ready::make to pin_project_lite

* refactors buffer::future::ResponseState to allow pin_project_lite

* migrates buffer::future to pin_project_lite

* refactors util::AndThenFuture to allow pin_project_lite

* migrates util::AndThenFuture to pin_project_lite

* migrates hedge::Future to pin_project_lite

* migrates hedge::select::ResponseFuture to pin_project_lite

* refactors hedge::delay enum for pin_project_lite

* refactors reconnect::future enum for pin_project_lite

* refactors oneshot::State enum for pin_project_lite

* migrates util::oneshot to pin_project_lite

* migrates reconnect::future to pin_project_lite

* migrates hedge::delay to pin_project_lite

* migrates hedge::latency to pin_project_lite

* migrates discover::list to pin_project_lite

* migrates timeout::future to pin_project_lite

* migrates balance::pool to pin_project_lite

* migrates balance::p2c::make to pin_project_lite

* migrates balance::p2c::service to pin_project_lite

* migrates call_all::ordered to pin_project_lite

* migrates call_all::common to pin_project_lite

* migrates call_all::unordered to pin_project_lite

* migrates util::optional::future to pin_project_lite

* migrates limit::concurrency::future to pin_project_lite

* migrates tower-balance example to pin_project_lite

* applies cargo fmt

* migrates tower-test to pin_project_lite

* fixes cargo hack check

peak_wma and pending_requests will now properly compile without the "discover" feature enabled.

* fixes lint rename warning on nightly

broken_intra_doc_links has been renamed to rustdoc::broken_intra_doc_links

* migrates buffer::Worker to pin_project_lite

pin_project_lite does support PinnedDrop
https://github.com/taiki-e/pin-project-lite/pull/25/files

However, it does not support generic trait bounds on the PinnedDrop impl.

To workaround this, I removed the T::Error bound from the Worker struct definition,
and moved `close_semaphore` to a a new impl without that trait bound.

* fixes abort_on_drop test

This test was also failing on master.

* applies cargo fmt
This commit is contained in:
Michael-J-Ward 2021-07-28 12:48:47 -05:00 committed by GitHub
parent 77760198f1
commit ee131aaf46
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
49 changed files with 667 additions and 509 deletions

View File

@ -5,7 +5,7 @@
rust_2018_idioms,
unreachable_pub
)]
#![deny(broken_intra_doc_links)]
#![deny(rustdoc::broken_intra_doc_links)]
//! Layer traits and extensions.
//!

View File

@ -5,7 +5,7 @@
rust_2018_idioms,
unreachable_pub
)]
#![deny(broken_intra_doc_links)]
#![deny(rustdoc::broken_intra_doc_links)]
//! Definition of the core `Service` trait to Tower
//!

View File

@ -27,7 +27,7 @@ tokio = { version = "1.0", features = ["sync"] }
tokio-test = "0.4"
tower-layer = { version = "0.3", path = "../tower-layer" }
tower-service = { version = "0.3" }
pin-project = "1"
pin-project-lite = "0.2"
[dev-dependencies]
tokio = { version = "1.0", features = ["macros"] }

View File

@ -6,7 +6,7 @@
unreachable_pub
)]
#![allow(elided_lifetimes_in_paths)]
#![deny(broken_intra_doc_links)]
#![deny(rustdoc::broken_intra_doc_links)]
//! Mock `Service` that can be used in tests.

View File

@ -2,7 +2,7 @@
use crate::mock::error::{self, Error};
use futures_util::ready;
use pin_project::pin_project;
use pin_project_lite::pin_project;
use tokio::sync::oneshot;
use std::{
@ -11,12 +11,13 @@ use std::{
task::{Context, Poll},
};
/// Future of the `Mock` response.
#[pin_project]
#[derive(Debug)]
pub struct ResponseFuture<T> {
#[pin]
rx: Option<Rx<T>>,
pin_project! {
/// Future of the `Mock` response.
#[derive(Debug)]
pub struct ResponseFuture<T> {
#[pin]
rx: Option<Rx<T>>,
}
}
type Rx<T> = oneshot::Receiver<Result<T, Error>>;

View File

@ -76,6 +76,7 @@ tokio = { version = "1", optional = true, features = ["sync"] }
tokio-stream = { version = "0.1.0", optional = true }
tokio-util = { version = "0.6.3", default-features = false, optional = true }
tracing = { version = "0.1.2", optional = true }
pin-project-lite = "0.2.7"
[dev-dependencies]
futures = "0.3"

View File

@ -3,7 +3,7 @@
use futures_core::{Stream, TryStream};
use futures_util::{stream, stream::StreamExt, stream::TryStreamExt};
use hdrhistogram::Histogram;
use pin_project::pin_project;
use pin_project_lite::pin_project;
use rand::{self, Rng};
use std::hash::Hash;
use std::time::Duration;
@ -78,8 +78,17 @@ type Error = Box<dyn std::error::Error + Send + Sync>;
type Key = usize;
#[pin_project]
struct Disco<S>(Vec<(Key, S)>);
pin_project! {
struct Disco<S> {
services: Vec<(Key, S)>
}
}
impl<S> Disco<S> {
fn new(services: Vec<(Key, S)>) -> Self {
Self { services }
}
}
impl<S> Stream for Disco<S>
where
@ -88,7 +97,7 @@ where
type Item = Result<Change<Key, S>, Error>;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.project().0.pop() {
match self.project().services.pop() {
Some((k, service)) => Poll::Ready(Some(Ok(Change::Insert(k, service)))),
None => {
// there may be more later
@ -105,7 +114,7 @@ fn gen_disco() -> impl Discover<
impl Service<Req, Response = Rsp, Error = Error, Future = impl Send> + Send,
>,
> + Send {
Disco(
Disco::new(
MAX_ENDPOINT_LATENCIES
.iter()
.enumerate()

View File

@ -1,7 +1,7 @@
use super::Balance;
use crate::discover::Discover;
use futures_core::ready;
use pin_project::pin_project;
use pin_project_lite::pin_project;
use std::hash::Hash;
use std::marker::PhantomData;
use std::{
@ -29,15 +29,16 @@ pub struct MakeBalance<S, Req> {
_marker: PhantomData<fn(Req)>,
}
/// A [`Balance`] in the making.
///
/// [`Balance`]: crate::balance::p2c::Balance
#[pin_project]
#[derive(Debug)]
pub struct MakeFuture<F, Req> {
#[pin]
inner: F,
_marker: PhantomData<fn(Req)>,
pin_project! {
/// A [`Balance`] in the making.
///
/// [`Balance`]: crate::balance::p2c::Balance
#[derive(Debug)]
pub struct MakeFuture<F, Req> {
#[pin]
inner: F,
_marker: PhantomData<fn(Req)>,
}
}
impl<S, Req> MakeBalance<S, Req> {

View File

@ -4,7 +4,7 @@ use crate::load::Load;
use crate::ready_cache::{error::Failed, ReadyCache};
use futures_core::ready;
use futures_util::future::{self, TryFutureExt};
use pin_project::pin_project;
use pin_project_lite::pin_project;
use rand::{rngs::SmallRng, Rng, SeedableRng};
use std::hash::Hash;
use std::marker::PhantomData;
@ -59,18 +59,19 @@ where
}
}
/// A Future that becomes satisfied when an `S`-typed service is ready.
///
/// May fail due to cancelation, i.e., if [`Discover`] removes the service from the service set.
#[pin_project]
#[derive(Debug)]
struct UnreadyService<K, S, Req> {
key: Option<K>,
#[pin]
cancel: oneshot::Receiver<()>,
service: Option<S>,
pin_project! {
/// A Future that becomes satisfied when an `S`-typed service is ready.
///
/// May fail due to cancelation, i.e., if [`Discover`] removes the service from the service set.
#[derive(Debug)]
struct UnreadyService<K, S, Req> {
key: Option<K>,
#[pin]
cancel: oneshot::Receiver<()>,
service: Option<S>,
_req: PhantomData<Req>,
_req: PhantomData<Req>,
}
}
enum Error<E> {

View File

@ -19,7 +19,7 @@ use crate::discover::Change;
use crate::load::Load;
use crate::make::MakeService;
use futures_core::{ready, Stream};
use pin_project::pin_project;
use pin_project_lite::pin_project;
use slab::Slab;
use std::{
fmt,
@ -42,23 +42,24 @@ enum Level {
High,
}
/// A wrapper around `MakeService` that discovers a new service when load is high, and removes a
/// service when load is low. See [`Pool`].
#[pin_project]
pub struct PoolDiscoverer<MS, Target, Request>
where
MS: MakeService<Target, Request>,
{
maker: MS,
#[pin]
making: Option<MS::Future>,
target: Target,
load: Level,
services: Slab<()>,
died_tx: tokio::sync::mpsc::UnboundedSender<usize>,
#[pin]
died_rx: tokio::sync::mpsc::UnboundedReceiver<usize>,
limit: Option<usize>,
pin_project! {
/// A wrapper around `MakeService` that discovers a new service when load is high, and removes a
/// service when load is low. See [`Pool`].
pub struct PoolDiscoverer<MS, Target, Request>
where
MS: MakeService<Target, Request>,
{
maker: MS,
#[pin]
making: Option<MS::Future>,
target: Target,
load: Level,
services: Slab<()>,
died_tx: tokio::sync::mpsc::UnboundedSender<usize>,
#[pin]
died_rx: tokio::sync::mpsc::UnboundedReceiver<usize>,
limit: Option<usize>,
}
}
impl<MS, Target, Request> fmt::Debug for PoolDiscoverer<MS, Target, Request>

View File

@ -4,39 +4,50 @@
use super::{error::Closed, message};
use futures_core::ready;
use pin_project::pin_project;
use pin_project_lite::pin_project;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
/// Future that completes when the buffered service eventually services the submitted request.
#[pin_project]
#[derive(Debug)]
pub struct ResponseFuture<T> {
#[pin]
state: ResponseState<T>,
pin_project! {
/// Future that completes when the buffered service eventually services the submitted request.
#[derive(Debug)]
pub struct ResponseFuture<T> {
#[pin]
state: ResponseState<T>,
}
}
#[pin_project(project = ResponseStateProj)]
#[derive(Debug)]
enum ResponseState<T> {
Failed(Option<crate::BoxError>),
Rx(#[pin] message::Rx<T>),
Poll(#[pin] T),
pin_project! {
#[project = ResponseStateProj]
#[derive(Debug)]
enum ResponseState<T> {
Failed {
error: Option<crate::BoxError>,
},
Rx {
#[pin]
rx: message::Rx<T>,
},
Poll {
#[pin]
fut: T,
},
}
}
impl<T> ResponseFuture<T> {
pub(crate) fn new(rx: message::Rx<T>) -> Self {
ResponseFuture {
state: ResponseState::Rx(rx),
state: ResponseState::Rx { rx },
}
}
pub(crate) fn failed(err: crate::BoxError) -> Self {
ResponseFuture {
state: ResponseState::Failed(Some(err)),
state: ResponseState::Failed { error: Some(err) },
}
}
}
@ -53,15 +64,15 @@ where
loop {
match this.state.as_mut().project() {
ResponseStateProj::Failed(e) => {
return Poll::Ready(Err(e.take().expect("polled after error")));
ResponseStateProj::Failed { error } => {
return Poll::Ready(Err(error.take().expect("polled after error")));
}
ResponseStateProj::Rx(rx) => match ready!(rx.poll(cx)) {
Ok(Ok(f)) => this.state.set(ResponseState::Poll(f)),
ResponseStateProj::Rx { rx } => match ready!(rx.poll(cx)) {
Ok(Ok(fut)) => this.state.set(ResponseState::Poll { fut }),
Ok(Err(e)) => return Poll::Ready(Err(e.into())),
Err(_) => return Poll::Ready(Err(Closed::new().into())),
},
ResponseStateProj::Poll(fut) => return fut.poll(cx).map_err(Into::into),
ResponseStateProj::Poll { fut } => return fut.poll(cx).map_err(Into::into),
}
}
}

View File

@ -3,7 +3,6 @@ use super::{
message::Message,
};
use futures_core::ready;
use pin_project::pin_project;
use std::sync::{Arc, Mutex, Weak};
use std::{
future::Future,
@ -13,27 +12,34 @@ use std::{
use tokio::sync::{mpsc, Semaphore};
use tower_service::Service;
/// Task that handles processing the buffer. This type should not be used
/// directly, instead `Buffer` requires an `Executor` that can accept this task.
///
/// The struct is `pub` in the private module and the type is *not* re-exported
/// as part of the public API. This is the "sealed" pattern to include "private"
/// types in public traits that are not meant for consumers of the library to
/// implement (only call).
#[pin_project(PinnedDrop)]
#[derive(Debug)]
pub struct Worker<T, Request>
where
T: Service<Request>,
T::Error: Into<crate::BoxError>,
{
current_message: Option<Message<Request, T::Future>>,
rx: mpsc::UnboundedReceiver<Message<Request, T::Future>>,
service: T,
finish: bool,
failed: Option<ServiceError>,
handle: Handle,
close: Option<Weak<Semaphore>>,
pin_project_lite::pin_project! {
/// Task that handles processing the buffer. This type should not be used
/// directly, instead `Buffer` requires an `Executor` that can accept this task.
///
/// The struct is `pub` in the private module and the type is *not* re-exported
/// as part of the public API. This is the "sealed" pattern to include "private"
/// types in public traits that are not meant for consumers of the library to
/// implement (only call).
#[derive(Debug)]
pub struct Worker<T, Request>
where
T: Service<Request>,
{
current_message: Option<Message<Request, T::Future>>,
rx: mpsc::UnboundedReceiver<Message<Request, T::Future>>,
service: T,
finish: bool,
failed: Option<ServiceError>,
handle: Handle,
close: Option<Weak<Semaphore>>,
}
impl<T: Service<Request>, Request> PinnedDrop for Worker<T, Request>
{
fn drop(mut this: Pin<&mut Self>) {
this.as_mut().close_semaphore();
}
}
}
/// Get the error out
@ -42,6 +48,22 @@ pub(crate) struct Handle {
inner: Arc<Mutex<Option<ServiceError>>>,
}
impl<T, Request> Worker<T, Request>
where
T: Service<Request>,
{
/// Closes the buffer's semaphore if it is still open, waking any pending
/// tasks.
fn close_semaphore(&mut self) {
if let Some(close) = self.close.take().as_ref().and_then(Weak::upgrade) {
tracing::debug!("buffer closing; waking pending tasks");
close.close();
} else {
tracing::trace!("buffer already closed");
}
}
}
impl<T, Request> Worker<T, Request>
where
T: Service<Request>,
@ -141,17 +163,6 @@ where
// requests that we receive before we've exhausted the receiver receive the error:
self.failed = Some(error);
}
/// Closes the buffer's semaphore if it is still open, waking any pending
/// tasks.
fn close_semaphore(&mut self) {
if let Some(close) = self.close.take().as_ref().and_then(Weak::upgrade) {
tracing::debug!("buffer closing; waking pending tasks");
close.close();
} else {
tracing::trace!("buffer already closed");
}
}
}
impl<T, Request> Future for Worker<T, Request>
@ -225,17 +236,6 @@ where
}
}
#[pin_project::pinned_drop]
impl<T, Request> PinnedDrop for Worker<T, Request>
where
T: Service<Request>,
T::Error: Into<crate::BoxError>,
{
fn drop(mut self: Pin<&mut Self>) {
self.as_mut().close_semaphore();
}
}
impl Handle {
pub(crate) fn get_error_on_closed(&self) -> crate::BoxError {
self.inner

View File

@ -1,6 +1,6 @@
use super::{error::Never, Change};
use futures_core::Stream;
use pin_project::pin_project;
use pin_project_lite::pin_project;
use std::iter::{Enumerate, IntoIterator};
use std::{
pin::Pin,
@ -8,17 +8,18 @@ use std::{
};
use tower_service::Service;
/// Static service discovery based on a predetermined list of services.
///
/// [`ServiceList`] is created with an initial list of services. The discovery
/// process will yield this list once and do nothing after.
#[pin_project]
#[derive(Debug)]
pub struct ServiceList<T>
where
T: IntoIterator,
{
inner: Enumerate<T::IntoIter>,
pin_project! {
/// Static service discovery based on a predetermined list of services.
///
/// [`ServiceList`] is created with an initial list of services. The discovery
/// process will yield this list once and do nothing after.
#[derive(Debug)]
pub struct ServiceList<T>
where
T: IntoIterator,
{
inner: Enumerate<T::IntoIter>,
}
}
impl<T, U> ServiceList<T>

View File

@ -3,7 +3,7 @@
use super::AsyncPredicate;
use crate::BoxError;
use futures_core::ready;
use pin_project::pin_project;
use pin_project_lite::pin_project;
use std::{
future::Future,
pin::Pin,
@ -11,21 +11,22 @@ use std::{
};
use tower_service::Service;
/// Filtered response future from [`AsyncFilter`] services.
///
/// [`AsyncFilter`]: crate::filter::AsyncFilter
#[pin_project]
#[derive(Debug)]
pub struct AsyncResponseFuture<P, S, Request>
where
P: AsyncPredicate<Request>,
S: Service<P::Request>,
{
#[pin]
state: State<P::Future, S::Future>,
pin_project! {
/// Filtered response future from [`AsyncFilter`] services.
///
/// [`AsyncFilter`]: crate::filter::AsyncFilter
#[derive(Debug)]
pub struct AsyncResponseFuture<P, S, Request>
where
P: AsyncPredicate<Request>,
S: Service<P::Request>,
{
#[pin]
state: State<P::Future, S::Future>,
/// Inner service
service: S,
// Inner service
service: S,
}
}
opaque_future! {
@ -39,13 +40,21 @@ opaque_future! {
>;
}
#[pin_project(project = StateProj)]
#[derive(Debug)]
enum State<F, G> {
/// Waiting for the predicate future
Check(#[pin] F),
/// Waiting for the response future
WaitResponse(#[pin] G),
pin_project! {
#[project = StateProj]
#[derive(Debug)]
enum State<F, G> {
/// Waiting for the predicate future
Check {
#[pin]
check: F
},
/// Waiting for the response future
WaitResponse {
#[pin]
response: G
},
}
}
impl<P, S, Request> AsyncResponseFuture<P, S, Request>
@ -56,7 +65,7 @@ where
{
pub(crate) fn new(check: P::Future, service: S) -> Self {
Self {
state: State::Check(check),
state: State::Check { check },
service,
}
}
@ -75,12 +84,12 @@ where
loop {
match this.state.as_mut().project() {
StateProj::Check(mut check) => {
StateProj::Check { mut check } => {
let request = ready!(check.as_mut().poll(cx))?;
let response = this.service.call(request);
this.state.set(State::WaitResponse(response));
this.state.set(State::WaitResponse { response });
}
StateProj::WaitResponse(response) => {
StateProj::WaitResponse { response } => {
return response.poll(cx).map_err(Into::into);
}
}

View File

@ -112,7 +112,7 @@ where
}
fn call(&mut self, request: Request) -> Self::Future {
ResponseFuture(match self.predicate.check(request) {
ResponseFuture::new(match self.predicate.check(request) {
Ok(request) => Either::Right(self.inner.call(request).err_into()),
Err(e) => Either::Left(futures_util::future::ready(Err(e.into()))),
})

View File

@ -1,5 +1,5 @@
use futures_util::ready;
use pin_project::pin_project;
use pin_project_lite::pin_project;
use std::time::Duration;
use std::{
future::Future,
@ -23,22 +23,42 @@ pub struct Delay<P, S> {
service: S,
}
#[pin_project]
#[derive(Debug)]
pub struct ResponseFuture<Request, S>
where
S: Service<Request>,
{
service: Option<S>,
#[pin]
state: State<Request, Oneshot<S, Request>>,
pin_project! {
#[derive(Debug)]
pub struct ResponseFuture<Request, S>
where
S: Service<Request>,
{
service: Option<S>,
#[pin]
state: State<Request, Oneshot<S, Request>>,
}
}
#[pin_project(project = StateProj)]
#[derive(Debug)]
enum State<Request, F> {
Delaying(#[pin] tokio::time::Sleep, Option<Request>),
Called(#[pin] F),
pin_project! {
#[project = StateProj]
#[derive(Debug)]
enum State<Request, F> {
Delaying {
#[pin]
delay: tokio::time::Sleep,
req: Option<Request>,
},
Called {
#[pin]
fut: F,
},
}
}
impl<Request, F> State<Request, F> {
fn delaying(delay: tokio::time::Sleep, req: Option<Request>) -> Self {
Self::Delaying { delay, req }
}
fn called(fut: F) -> Self {
Self::Called { fut }
}
}
impl<P, S> Delay<P, S> {
@ -73,7 +93,7 @@ where
let delay = self.policy.delay(&request);
ResponseFuture {
service: Some(self.service.clone()),
state: State::Delaying(tokio::time::sleep(delay), Some(request)),
state: State::delaying(tokio::time::sleep(delay), Some(request)),
}
}
}
@ -90,14 +110,14 @@ where
loop {
match this.state.as_mut().project() {
StateProj::Delaying(delay, req) => {
StateProj::Delaying { delay, req } => {
ready!(delay.poll(cx));
let req = req.take().expect("Missing request in delay");
let svc = this.service.take().expect("Missing service in delay");
let fut = Oneshot::new(svc, req);
this.state.set(State::Called(fut));
this.state.set(State::called(fut));
}
StateProj::Called(fut) => {
StateProj::Called { fut } => {
return fut.poll(cx).map_err(Into::into);
}
};

View File

@ -1,5 +1,5 @@
use futures_util::ready;
use pin_project::pin_project;
use pin_project_lite::pin_project;
use std::time::Duration;
use std::{
future::Future,
@ -24,13 +24,14 @@ pub struct Latency<R, S> {
service: S,
}
#[pin_project]
#[derive(Debug)]
pub struct ResponseFuture<R, F> {
start: Instant,
rec: R,
#[pin]
inner: F,
pin_project! {
#[derive(Debug)]
pub struct ResponseFuture<R, F> {
start: Instant,
rec: R,
#[pin]
inner: F,
}
}
impl<S, R> Latency<R, S>

View File

@ -5,7 +5,7 @@
use crate::filter::AsyncFilter;
use futures_util::future;
use pin_project::pin_project;
use pin_project_lite::pin_project;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::{
@ -37,17 +37,18 @@ type Service<S, P> = select::Select<
#[derive(Debug)]
pub struct Hedge<S, P>(Service<S, P>);
/// The [`Future`] returned by the [`Hedge`] service.
///
/// [`Future`]: std::future::Future
#[pin_project]
#[derive(Debug)]
pub struct Future<S, Request>
where
S: tower_service::Service<Request>,
{
#[pin]
inner: S::Future,
pin_project! {
/// The [`Future`] returned by the [`Hedge`] service.
///
/// [`Future`]: std::future::Future
#[derive(Debug)]
pub struct Future<S, Request>
where
S: tower_service::Service<Request>,
{
#[pin]
inner: S::Future,
}
}
/// A policy which describes which requests can be cloned and then whether those

View File

@ -1,4 +1,4 @@
use pin_project::pin_project;
use pin_project_lite::pin_project;
use std::{
future::Future,
pin::Pin,
@ -23,13 +23,14 @@ pub struct Select<P, A, B> {
b: B,
}
#[pin_project]
#[derive(Debug)]
pub struct ResponseFuture<AF, BF> {
#[pin]
a_fut: AF,
#[pin]
b_fut: Option<BF>,
pin_project! {
#[derive(Debug)]
pub struct ResponseFuture<AF, BF> {
#[pin]
a_fut: AF,
#[pin]
b_fut: Option<BF>,
}
}
impl<P, A, B> Select<P, A, B> {

View File

@ -5,7 +5,7 @@
rust_2018_idioms,
unreachable_pub
)]
#![deny(broken_intra_doc_links)]
#![deny(rustdoc::broken_intra_doc_links)]
#![allow(elided_lifetimes_in_paths, clippy::type_complexity)]
#![cfg_attr(test, allow(clippy::float_cmp))]
#![cfg_attr(docsrs, feature(doc_cfg))]

View File

@ -2,7 +2,7 @@
//!
//! [`Future`]: std::future::Future
use futures_core::ready;
use pin_project::pin_project;
use pin_project_lite::pin_project;
use std::{
future::Future,
pin::Pin,
@ -10,16 +10,17 @@ use std::{
};
use tokio::sync::OwnedSemaphorePermit;
/// Future for the [`ConcurrencyLimit`] service.
///
/// [`ConcurrencyLimit`]: crate::limit::ConcurrencyLimit
#[pin_project]
#[derive(Debug)]
pub struct ResponseFuture<T> {
#[pin]
inner: T,
// Keep this around so that it is dropped when the future completes
_permit: OwnedSemaphorePermit,
pin_project! {
/// Future for the [`ConcurrencyLimit`] service.
///
/// [`ConcurrencyLimit`]: crate::limit::ConcurrencyLimit
#[derive(Debug)]
pub struct ResponseFuture<T> {
#[pin]
inner: T,
// Keep this around so that it is dropped when the future completes
_permit: OwnedSemaphorePermit,
}
}
impl<T> ResponseFuture<T> {

View File

@ -1,7 +1,7 @@
//! Application-specific request completion semantics.
use futures_core::ready;
use pin_project::pin_project;
use pin_project_lite::pin_project;
use std::{
future::Future,
pin::Pin,
@ -44,14 +44,15 @@ pub trait TrackCompletion<H, V>: Clone {
#[non_exhaustive]
pub struct CompleteOnResponse;
/// Attaches a `C`-typed completion tracker to the result of an `F`-typed [`Future`].
#[pin_project]
#[derive(Debug)]
pub struct TrackCompletionFuture<F, C, H> {
#[pin]
future: F,
handle: Option<H>,
completion: C,
pin_project! {
/// Attaches a `C`-typed completion tracker to the result of an `F`-typed [`Future`].
#[derive(Debug)]
pub struct TrackCompletionFuture<F, C, H> {
#[pin]
future: F,
handle: Option<H>,
completion: C,
}
}
// ===== impl InstrumentFuture =====

View File

@ -8,18 +8,19 @@ use futures_core::{ready, Stream};
use std::pin::Pin;
use super::Load;
use pin_project::pin_project;
use pin_project_lite::pin_project;
use std::task::{Context, Poll};
use tower_service::Service;
/// Wraps a type so that it implements [`Load`] and returns a constant load metric.
///
/// This load estimator is primarily useful for testing.
#[pin_project]
#[derive(Debug)]
pub struct Constant<T, M> {
inner: T,
load: M,
pin_project! {
#[derive(Debug)]
/// Wraps a type so that it implements [`Load`] and returns a constant load metric.
///
/// This load estimator is primarily useful for testing.
pub struct Constant<T, M> {
inner: T,
load: M,
}
}
// ===== impl Constant =====

View File

@ -5,7 +5,7 @@ use crate::discover::{Change, Discover};
#[cfg(feature = "discover")]
use futures_core::{ready, Stream};
#[cfg(feature = "discover")]
use pin_project::pin_project;
use pin_project_lite::pin_project;
#[cfg(feature = "discover")]
use std::pin::Pin;
@ -48,17 +48,18 @@ pub struct PeakEwma<S, C = CompleteOnResponse> {
completion: C,
}
/// Wraps a `D`-typed stream of discovered services with `PeakEwma`.
#[pin_project]
#[derive(Debug)]
#[cfg(feature = "discover")]
#[cfg_attr(docsrs, doc(cfg(feature = "discover")))]
pub struct PeakEwmaDiscover<D, C = CompleteOnResponse> {
#[pin]
discover: D,
decay_ns: f64,
default_rtt: Duration,
completion: C,
pin_project! {
/// Wraps a `D`-typed stream of discovered services with `PeakEwma`.
#[derive(Debug)]
pub struct PeakEwmaDiscover<D, C = CompleteOnResponse> {
#[pin]
discover: D,
decay_ns: f64,
default_rtt: Duration,
completion: C,
}
}
/// Represents the relative cost of communicating with a service.
@ -378,11 +379,11 @@ mod tests {
time::advance(Duration::from_millis(100)).await;
let () = assert_ready_ok!(rsp0.poll());
assert_eq!(svc.load(), Cost(404_000_000.0));
assert_eq!(svc.load(), Cost(400_000_000.0));
time::advance(Duration::from_millis(100)).await;
let () = assert_ready_ok!(rsp1.poll());
assert_eq!(svc.load(), Cost(202_000_000.0));
assert_eq!(svc.load(), Cost(200_000_000.0));
// Check that values decay as time elapses
time::advance(Duration::from_secs(1)).await;

View File

@ -5,7 +5,7 @@ use crate::discover::{Change, Discover};
#[cfg(feature = "discover")]
use futures_core::{ready, Stream};
#[cfg(feature = "discover")]
use pin_project::pin_project;
use pin_project_lite::pin_project;
#[cfg(feature = "discover")]
use std::pin::Pin;
@ -27,15 +27,16 @@ pub struct PendingRequests<S, C = CompleteOnResponse> {
#[derive(Clone, Debug, Default)]
struct RefCount(Arc<()>);
/// Wraps a `D`-typed stream of discovered services with [`PendingRequests`].
#[pin_project]
#[derive(Debug)]
#[cfg(feature = "discover")]
#[cfg_attr(docsrs, doc(cfg(feature = "discover")))]
pub struct PendingRequestsDiscover<D, C = CompleteOnResponse> {
#[pin]
discover: D,
completion: C,
pin_project! {
/// Wraps a `D`-typed stream of discovered services with [`PendingRequests`].
#[derive(Debug)]
pub struct PendingRequestsDiscover<D, C = CompleteOnResponse> {
#[pin]
discover: D,
completion: C,
}
}
/// Represents the number of currently-pending requests to a given service.

View File

@ -6,29 +6,35 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use futures_core::ready;
use pin_project::pin_project;
use pin_project_lite::pin_project;
use super::error::Overloaded;
/// Future for the [`LoadShed`] service.
///
/// [`LoadShed`]: crate::load_shed::LoadShed
#[pin_project]
pub struct ResponseFuture<F> {
#[pin]
state: ResponseState<F>,
pin_project! {
/// Future for the [`LoadShed`] service.
///
/// [`LoadShed`]: crate::load_shed::LoadShed
pub struct ResponseFuture<F> {
#[pin]
state: ResponseState<F>,
}
}
#[pin_project(project = ResponseStateProj)]
enum ResponseState<F> {
Called(#[pin] F),
Overloaded,
pin_project! {
#[project = ResponseStateProj]
enum ResponseState<F> {
Called {
#[pin]
fut: F
},
Overloaded,
}
}
impl<F> ResponseFuture<F> {
pub(crate) fn called(fut: F) -> Self {
ResponseFuture {
state: ResponseState::Called(fut),
state: ResponseState::Called { fut },
}
}
@ -48,7 +54,9 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.project().state.project() {
ResponseStateProj::Called(fut) => Poll::Ready(ready!(fut.poll(cx)).map_err(Into::into)),
ResponseStateProj::Called { fut } => {
Poll::Ready(ready!(fut.poll(cx)).map_err(Into::into))
}
ResponseStateProj::Overloaded => Poll::Ready(Err(Overloaded::new().into())),
}
}

View File

@ -6,9 +6,21 @@
))]
macro_rules! opaque_future {
($(#[$m:meta])* pub type $name:ident<$($param:ident),+> = $actual:ty;) => {
#[pin_project::pin_project]
$(#[$m])*
pub struct $name<$($param),+>(#[pin] pub(crate) $actual);
pin_project_lite::pin_project! {
$(#[$m])*
pub struct $name<$($param),+> {
#[pin]
inner: $actual
}
}
impl<$($param),+> $name<$($param),+> {
pub(crate) fn new(inner: $actual) -> Self {
Self {
inner
}
}
}
impl<$($param),+> std::fmt::Debug for $name<$($param),+> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
@ -23,7 +35,7 @@ macro_rules! opaque_future {
type Output = <$actual as std::future::Future>::Output;
#[inline]
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
self.project().0.poll(cx)
self.project().inner.poll(cx)
}
}
}

View File

@ -93,7 +93,7 @@ where
}
fn call(&mut self, _target: T) -> Self::Future {
SharedFuture(futures_util::future::ready(Ok(self.service.clone())))
SharedFuture::new(futures_util::future::ready(Ok(self.service.clone())))
}
}

View File

@ -1,35 +1,53 @@
use pin_project::pin_project;
use pin_project_lite::pin_project;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
/// Future that resolves to the response or failure to connect.
#[pin_project]
#[derive(Debug)]
pub struct ResponseFuture<F, E> {
#[pin]
inner: Inner<F, E>,
pin_project! {
/// Future that resolves to the response or failure to connect.
#[derive(Debug)]
pub struct ResponseFuture<F, E> {
#[pin]
inner: Inner<F, E>,
}
}
#[pin_project(project = InnerProj)]
#[derive(Debug)]
enum Inner<F, E> {
Future(#[pin] F),
Error(Option<E>),
pin_project! {
#[project = InnerProj]
#[derive(Debug)]
enum Inner<F, E> {
Future {
#[pin]
fut: F,
},
Error {
error: Option<E>,
},
}
}
impl<F, E> Inner<F, E> {
fn future(fut: F) -> Self {
Self::Future { fut }
}
fn error(error: Option<E>) -> Self {
Self::Error { error }
}
}
impl<F, E> ResponseFuture<F, E> {
pub(crate) fn new(inner: F) -> Self {
ResponseFuture {
inner: Inner::Future(inner),
inner: Inner::future(inner),
}
}
pub(crate) fn error(error: E) -> Self {
ResponseFuture {
inner: Inner::Error(Some(error)),
inner: Inner::error(Some(error)),
}
}
}
@ -45,9 +63,9 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let me = self.project();
match me.inner.project() {
InnerProj::Future(fut) => fut.poll(cx).map_err(Into::into),
InnerProj::Error(e) => {
let e = e.take().expect("Polled after ready.").into();
InnerProj::Future { fut } => fut.poll(cx).map_err(Into::into),
InnerProj::Error { error } => {
let e = error.take().expect("Polled after ready.").into();
Poll::Ready(Err(e))
}
}

View File

@ -2,36 +2,45 @@
use super::{Policy, Retry};
use futures_core::ready;
use pin_project::pin_project;
use pin_project_lite::pin_project;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use tower_service::Service;
/// The [`Future`] returned by a [`Retry`] service.
#[pin_project]
#[derive(Debug)]
pub struct ResponseFuture<P, S, Request>
where
P: Policy<Request, S::Response, S::Error>,
S: Service<Request>,
{
request: Option<Request>,
#[pin]
retry: Retry<P, S>,
#[pin]
state: State<S::Future, P::Future>,
pin_project! {
/// The [`Future`] returned by a [`Retry`] service.
#[derive(Debug)]
pub struct ResponseFuture<P, S, Request>
where
P: Policy<Request, S::Response, S::Error>,
S: Service<Request>,
{
request: Option<Request>,
#[pin]
retry: Retry<P, S>,
#[pin]
state: State<S::Future, P::Future>,
}
}
#[pin_project(project = StateProj)]
#[derive(Debug)]
enum State<F, P> {
/// Polling the future from [`Service::call`]
Called(#[pin] F),
/// Polling the future from [`Policy::retry`]
Checking(#[pin] P),
/// Polling [`Service::poll_ready`] after [`Checking`] was OK.
Retrying,
pin_project! {
#[project = StateProj]
#[derive(Debug)]
enum State<F, P> {
// Polling the future from [`Service::call`]
Called {
#[pin]
future: F
},
// Polling the future from [`Policy::retry`]
Checking {
#[pin]
checking: P
},
// Polling [`Service::poll_ready`] after [`Checking`] was OK.
Retrying,
}
}
impl<P, S, Request> ResponseFuture<P, S, Request>
@ -47,7 +56,7 @@ where
ResponseFuture {
request,
retry,
state: State::Called(future),
state: State::Called { future },
}
}
}
@ -64,12 +73,12 @@ where
loop {
match this.state.as_mut().project() {
StateProj::Called(future) => {
StateProj::Called { future } => {
let result = ready!(future.poll(cx));
if let Some(ref req) = this.request {
match this.retry.policy.retry(req, result.as_ref()) {
Some(checking) => {
this.state.set(State::Checking(checking));
this.state.set(State::Checking { checking });
}
None => return Poll::Ready(result),
}
@ -78,12 +87,12 @@ where
return Poll::Ready(result);
}
}
StateProj::Checking(future) => {
StateProj::Checking { checking } => {
this.retry
.as_mut()
.project()
.policy
.set(ready!(future.poll(cx)));
.set(ready!(checking.poll(cx)));
this.state.set(State::Retrying);
}
StateProj::Retrying => {
@ -104,9 +113,9 @@ where
.take()
.expect("retrying requires cloned request");
*this.request = this.retry.policy.clone_request(&req);
this.state.set(State::Called(
this.retry.as_mut().project().service.call(req),
));
this.state.set(State::Called {
future: this.retry.as_mut().project().service.call(req),
});
}
}
}

View File

@ -9,19 +9,20 @@ pub use self::layer::RetryLayer;
pub use self::policy::Policy;
use self::future::ResponseFuture;
use pin_project::pin_project;
use pin_project_lite::pin_project;
use std::task::{Context, Poll};
use tower_service::Service;
/// Configure retrying requests of "failed" responses.
///
/// A [`Policy`] classifies what is a "failed" response.
#[pin_project]
#[derive(Clone, Debug)]
pub struct Retry<P, S> {
#[pin]
policy: P,
service: S,
pin_project! {
/// Configure retrying requests of "failed" responses.
///
/// A [`Policy`] classifies what is a "failed" response.
#[derive(Clone, Debug)]
pub struct Retry<P, S> {
#[pin]
policy: P,
service: S,
}
}
// ===== impl Retry =====

View File

@ -1,6 +1,6 @@
use super::SpawnReady;
use futures_core::ready;
use pin_project::pin_project;
use pin_project_lite::pin_project;
use std::{
future::Future,
pin::Pin,
@ -21,12 +21,13 @@ impl<S> MakeSpawnReady<S> {
}
}
/// Builds a [`SpawnReady`] with the result of an inner [`Future`].
#[pin_project]
#[derive(Debug)]
pub struct MakeFuture<F> {
#[pin]
inner: F,
pin_project! {
/// Builds a [`SpawnReady`] with the result of an inner [`Future`].
#[derive(Debug)]
pub struct MakeFuture<F> {
#[pin]
inner: F,
}
}
impl<S, Target> Service<Target> for MakeSpawnReady<S>

View File

@ -75,7 +75,7 @@ where
fn call(&mut self, request: Req) -> Self::Future {
match self.inner {
Inner::Service(Some(ref mut svc)) => {
ResponseFuture(svc.call(request).map_err(Into::into))
ResponseFuture::new(svc.call(request).map_err(Into::into))
}
_ => unreachable!("poll_ready must be called"),
}

View File

@ -1,7 +1,7 @@
//! Future types
use super::error::Elapsed;
use pin_project::pin_project;
use pin_project_lite::pin_project;
use std::{
future::Future,
pin::Pin,
@ -9,16 +9,17 @@ use std::{
};
use tokio::time::Sleep;
/// [`Timeout`] response future
///
/// [`Timeout`]: crate::timeout::Timeout
#[pin_project]
#[derive(Debug)]
pub struct ResponseFuture<T> {
#[pin]
response: T,
#[pin]
sleep: Sleep,
pin_project! {
/// [`Timeout`] response future
///
/// [`Timeout`]: crate::timeout::Timeout
#[derive(Debug)]
pub struct ResponseFuture<T> {
#[pin]
response: T,
#[pin]
sleep: Sleep,
}
}
impl<T> ResponseFuture<T> {

View File

@ -28,13 +28,21 @@ where
}
}
/// Response future from [`AndThen`] services.
///
/// [`AndThen`]: crate::util::AndThen
#[pin_project::pin_project]
pub struct AndThenFuture<F1, F2: TryFuture, N>(
#[pin] pub(crate) future::AndThen<future::ErrInto<F1, F2::Error>, F2, N>,
);
pin_project_lite::pin_project! {
/// Response future from [`AndThen`] services.
///
/// [`AndThen`]: crate::util::AndThen
pub struct AndThenFuture<F1, F2: TryFuture, N> {
#[pin]
inner: future::AndThen<future::ErrInto<F1, F2::Error>, F2, N>,
}
}
impl<F1, F2: TryFuture, N> AndThenFuture<F1, F2, N> {
pub(crate) fn new(inner: future::AndThen<future::ErrInto<F1, F2::Error>, F2, N>) -> Self {
Self { inner }
}
}
impl<F1, F2: TryFuture, N> std::fmt::Debug for AndThenFuture<F1, F2, N> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
@ -52,7 +60,7 @@ where
#[inline]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().0.poll(cx)
self.project().inner.poll(cx)
}
}
@ -96,7 +104,7 @@ where
}
fn call(&mut self, request: Request) -> Self::Future {
AndThenFuture(self.inner.call(request).err_into().and_then(self.f.clone()))
AndThenFuture::new(self.inner.call(request).err_into().and_then(self.f.clone()))
}
}

View File

@ -1,5 +1,5 @@
use futures_core::{ready, Stream};
use pin_project::pin_project;
use pin_project_lite::pin_project;
use std::{
future::Future,
pin::Pin,
@ -7,15 +7,16 @@ use std::{
};
use tower_service::Service;
/// The [`Future`] returned by the [`ServiceExt::call_all`] combinator.
#[pin_project]
#[derive(Debug)]
pub(crate) struct CallAll<Svc, S, Q> {
service: Option<Svc>,
#[pin]
stream: S,
queue: Q,
eof: bool,
pin_project! {
/// The [`Future`] returned by the [`ServiceExt::call_all`] combinator.
#[derive(Debug)]
pub(crate) struct CallAll<Svc, S, Q> {
service: Option<Svc>,
#[pin]
stream: S,
queue: Q,
eof: bool,
}
}
pub(crate) trait Drive<F: Future> {

View File

@ -6,7 +6,7 @@
use super::common;
use futures_core::Stream;
use futures_util::stream::FuturesOrdered;
use pin_project::pin_project;
use pin_project_lite::pin_project;
use std::{
future::Future,
pin::Pin,
@ -14,84 +14,85 @@ use std::{
};
use tower_service::Service;
/// This is a [`Stream`] of responses resulting from calling the wrapped [`Service`] for each
/// request received on the wrapped [`Stream`].
///
/// ```rust
/// # use std::task::{Poll, Context};
/// # use std::cell::Cell;
/// # use std::error::Error;
/// # use std::rc::Rc;
/// #
/// use futures::future::{ready, Ready};
/// use futures::StreamExt;
/// use futures::channel::mpsc;
/// use tower_service::Service;
/// use tower::util::ServiceExt;
///
/// // First, we need to have a Service to process our requests.
/// #[derive(Debug, Eq, PartialEq)]
/// struct FirstLetter;
/// impl Service<&'static str> for FirstLetter {
/// type Response = &'static str;
/// type Error = Box<dyn Error + Send + Sync>;
/// type Future = Ready<Result<Self::Response, Self::Error>>;
///
/// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
/// Poll::Ready(Ok(()))
/// }
///
/// fn call(&mut self, req: &'static str) -> Self::Future {
/// ready(Ok(&req[..1]))
/// }
/// }
///
/// #[tokio::main]
/// async fn main() {
/// // Next, we need a Stream of requests.
// TODO(eliza): when `tokio-util` has a nice way to convert MPSCs to streams,
// tokio::sync::mpsc again?
/// let (mut reqs, rx) = mpsc::unbounded();
/// // Note that we have to help Rust out here by telling it what error type to use.
/// // Specifically, it has to be From<Service::Error> + From<Stream::Error>.
/// let mut rsps = FirstLetter.call_all(rx);
///
/// // Now, let's send a few requests and then check that we get the corresponding responses.
/// reqs.unbounded_send("one").unwrap();
/// reqs.unbounded_send("two").unwrap();
/// reqs.unbounded_send("three").unwrap();
/// drop(reqs);
///
/// // We then loop over the response Strem that we get back from call_all.
/// let mut i = 0usize;
/// while let Some(rsp) = rsps.next().await {
/// // Each response is a Result (we could also have used TryStream::try_next)
/// match (i + 1, rsp.unwrap()) {
/// (1, "o") |
/// (2, "t") |
/// (3, "t") => {}
/// (n, i) => {
/// unreachable!("{}. response was '{}'", n, i);
/// }
/// }
/// i += 1;
/// }
///
/// // And at the end, we can get the Service back when there are no more requests.
/// assert_eq!(rsps.into_inner(), FirstLetter);
/// }
/// ```
///
/// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
#[pin_project]
#[derive(Debug)]
pub struct CallAll<Svc, S>
where
Svc: Service<S::Item>,
S: Stream,
{
#[pin]
inner: common::CallAll<Svc, S, FuturesOrdered<Svc::Future>>,
pin_project! {
/// This is a [`Stream`] of responses resulting from calling the wrapped [`Service`] for each
/// request received on the wrapped [`Stream`].
///
/// ```rust
/// # use std::task::{Poll, Context};
/// # use std::cell::Cell;
/// # use std::error::Error;
/// # use std::rc::Rc;
/// #
/// use futures::future::{ready, Ready};
/// use futures::StreamExt;
/// use futures::channel::mpsc;
/// use tower_service::Service;
/// use tower::util::ServiceExt;
///
/// // First, we need to have a Service to process our requests.
/// #[derive(Debug, Eq, PartialEq)]
/// struct FirstLetter;
/// impl Service<&'static str> for FirstLetter {
/// type Response = &'static str;
/// type Error = Box<dyn Error + Send + Sync>;
/// type Future = Ready<Result<Self::Response, Self::Error>>;
///
/// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
/// Poll::Ready(Ok(()))
/// }
///
/// fn call(&mut self, req: &'static str) -> Self::Future {
/// ready(Ok(&req[..1]))
/// }
/// }
///
/// #[tokio::main]
/// async fn main() {
/// // Next, we need a Stream of requests.
// TODO(eliza): when `tokio-util` has a nice way to convert MPSCs to streams,
// tokio::sync::mpsc again?
/// let (mut reqs, rx) = mpsc::unbounded();
/// // Note that we have to help Rust out here by telling it what error type to use.
/// // Specifically, it has to be From<Service::Error> + From<Stream::Error>.
/// let mut rsps = FirstLetter.call_all(rx);
///
/// // Now, let's send a few requests and then check that we get the corresponding responses.
/// reqs.unbounded_send("one").unwrap();
/// reqs.unbounded_send("two").unwrap();
/// reqs.unbounded_send("three").unwrap();
/// drop(reqs);
///
/// // We then loop over the response Strem that we get back from call_all.
/// let mut i = 0usize;
/// while let Some(rsp) = rsps.next().await {
/// // Each response is a Result (we could also have used TryStream::try_next)
/// match (i + 1, rsp.unwrap()) {
/// (1, "o") |
/// (2, "t") |
/// (3, "t") => {}
/// (n, i) => {
/// unreachable!("{}. response was '{}'", n, i);
/// }
/// }
/// i += 1;
/// }
///
/// // And at the end, we can get the Service back when there are no more requests.
/// assert_eq!(rsps.into_inner(), FirstLetter);
/// }
/// ```
///
/// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
#[derive(Debug)]
pub struct CallAll<Svc, S>
where
Svc: Service<S::Item>,
S: Stream,
{
#[pin]
inner: common::CallAll<Svc, S, FuturesOrdered<Svc::Future>>,
}
}
impl<Svc, S> CallAll<Svc, S>

View File

@ -6,7 +6,7 @@
use super::common;
use futures_core::Stream;
use futures_util::stream::FuturesUnordered;
use pin_project::pin_project;
use pin_project_lite::pin_project;
use std::{
future::Future,
pin::Pin,
@ -14,21 +14,22 @@ use std::{
};
use tower_service::Service;
/// A stream of responses received from the inner service in received order.
///
/// Similar to [`CallAll`] except, instead of yielding responses in request order,
/// responses are returned as they are available.
///
/// [`CallAll`]: crate::util::CallAll
#[pin_project]
#[derive(Debug)]
pub struct CallAllUnordered<Svc, S>
where
Svc: Service<S::Item>,
S: Stream,
{
#[pin]
inner: common::CallAll<Svc, S, FuturesUnordered<Svc::Future>>,
pin_project! {
/// A stream of responses received from the inner service in received order.
///
/// Similar to [`CallAll`] except, instead of yielding responses in request order,
/// responses are returned as they are available.
///
/// [`CallAll`]: crate::util::CallAll
#[derive(Debug)]
pub struct CallAllUnordered<Svc, S>
where
Svc: Service<S::Item>,
S: Stream,
{
#[pin]
inner: common::CallAll<Svc, S, FuturesUnordered<Svc::Future>>,
}
}
impl<Svc, S> CallAllUnordered<Svc, S>

View File

@ -72,7 +72,7 @@ where
#[inline]
fn call(&mut self, request: Request) -> Self::Future {
MapErrFuture(self.inner.call(request).map_err(self.f.clone()))
MapErrFuture::new(self.inner.call(request).map_err(self.f.clone()))
}
}

View File

@ -72,7 +72,7 @@ where
#[inline]
fn call(&mut self, request: Request) -> Self::Future {
MapResponseFuture(self.inner.call(request).map_ok(self.f.clone()))
MapResponseFuture::new(self.inner.call(request).map_ok(self.f.clone()))
}
}

View File

@ -73,7 +73,7 @@ where
#[inline]
fn call(&mut self, request: Request) -> Self::Future {
MapResultFuture(self.inner.call(request).map(self.f.clone()))
MapResultFuture::new(self.inner.call(request).map(self.f.clone()))
}
}

View File

@ -133,7 +133,7 @@ pub trait ServiceExt<Request>: tower_service::Service<Request> {
/// # struct DatabaseService;
/// # impl DatabaseService {
/// # fn new(address: &str) -> Self {
/// # DatabaseService
/// # DatabaseService
/// # }
/// # }
/// #

View File

@ -1,5 +1,5 @@
use futures_core::ready;
use pin_project::pin_project;
use pin_project_lite::pin_project;
use std::{
fmt,
future::Future,
@ -8,21 +8,40 @@ use std::{
};
use tower_service::Service;
/// A [`Future`] consuming a [`Service`] and request, waiting until the [`Service`]
/// is ready, and then calling [`Service::call`] with the request, and
/// waiting for that [`Future`].
#[pin_project]
#[derive(Debug)]
pub struct Oneshot<S: Service<Req>, Req> {
#[pin]
state: State<S, Req>,
pin_project! {
/// A [`Future`] consuming a [`Service`] and request, waiting until the [`Service`]
/// is ready, and then calling [`Service::call`] with the request, and
/// waiting for that [`Future`].
#[derive(Debug)]
pub struct Oneshot<S: Service<Req>, Req> {
#[pin]
state: State<S, Req>,
}
}
#[pin_project(project = StateProj)]
enum State<S: Service<Req>, Req> {
NotReady(S, Option<Req>),
Called(#[pin] S::Future),
Done,
pin_project! {
#[project = StateProj]
enum State<S: Service<Req>, Req> {
NotReady {
svc: S,
req: Option<Req>,
},
Called {
#[pin]
fut: S::Future,
},
Done,
}
}
impl<S: Service<Req>, Req> State<S, Req> {
fn not_ready(svc: S, req: Option<Req>) -> Self {
Self::NotReady { svc, req }
}
fn called(fut: S::Future) -> Self {
Self::Called { fut }
}
}
impl<S, Req> fmt::Debug for State<S, Req>
@ -32,13 +51,16 @@ where
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
State::NotReady(s, Some(req)) => f
State::NotReady {
svc,
req: Some(req),
} => f
.debug_tuple("State::NotReady")
.field(s)
.field(svc)
.field(req)
.finish(),
State::NotReady(_, None) => unreachable!(),
State::Called(_) => f.debug_tuple("State::Called").field(&"S::Future").finish(),
State::NotReady { req: None, .. } => unreachable!(),
State::Called { .. } => f.debug_tuple("State::Called").field(&"S::Future").finish(),
State::Done => f.debug_tuple("State::Done").finish(),
}
}
@ -51,7 +73,7 @@ where
#[allow(missing_docs)]
pub fn new(svc: S, req: Req) -> Self {
Oneshot {
state: State::NotReady(svc, Some(req)),
state: State::not_ready(svc, Some(req)),
}
}
}
@ -66,12 +88,12 @@ where
let mut this = self.project();
loop {
match this.state.as_mut().project() {
StateProj::NotReady(svc, req) => {
StateProj::NotReady { svc, req } => {
let _ = ready!(svc.poll_ready(cx))?;
let f = svc.call(req.take().expect("already called"));
this.state.set(State::Called(f));
this.state.set(State::called(f));
}
StateProj::Called(fut) => {
StateProj::Called { fut } => {
let res = ready!(fut.poll(cx))?;
this.state.set(State::Done);
return Poll::Ready(Ok(res));

View File

@ -1,20 +1,21 @@
use super::error;
use futures_core::ready;
use pin_project::pin_project;
use pin_project_lite::pin_project;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
/// Response future returned by [`Optional`].
///
/// [`Optional`]: crate::util::Optional
#[pin_project]
#[derive(Debug)]
pub struct ResponseFuture<T> {
#[pin]
inner: Option<T>,
pin_project! {
/// Response future returned by [`Optional`].
///
/// [`Optional`]: crate::util::Optional
#[derive(Debug)]
pub struct ResponseFuture<T> {
#[pin]
inner: Option<T>,
}
}
impl<T> ResponseFuture<T> {

View File

@ -77,7 +77,7 @@ where
#[inline]
fn call(&mut self, request: Request) -> Self::Future {
ThenFuture(self.inner.call(request).then(self.f.clone()))
ThenFuture::new(self.inner.call(request).then(self.f.clone()))
}
}

View File

@ -37,7 +37,7 @@ fn stress() {
let _t = support::trace_init();
let mut task = task::spawn(());
let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<Result<_, &'static str>>();
let mut cache = Balance::<_, Req>::new(support::IntoStream(rx));
let mut cache = Balance::<_, Req>::new(support::IntoStream::new(rx));
let mut nready = 0;
let mut services = slab::Slab::<(mock::Handle<Req, Req>, bool)>::new();

View File

@ -81,5 +81,6 @@ async fn abort_on_drop() {
// End the task and ensure that the inner service has been dropped.
assert!(drop_tx.send(()).is_ok());
tokio_test::assert_ready!(task.poll());
tokio::task::yield_now().await;
assert!(tokio_test::assert_ready!(handle.poll_request()).is_none());
}

View File

@ -17,15 +17,25 @@ pub(crate) fn trace_init() -> tracing::subscriber::DefaultGuard {
tracing::subscriber::set_default(subscriber)
}
#[pin_project::pin_project]
#[derive(Clone, Debug)]
pub struct IntoStream<S>(#[pin] pub S);
pin_project_lite::pin_project! {
#[derive(Clone, Debug)]
pub struct IntoStream<S> {
#[pin]
inner: S
}
}
impl<S> IntoStream<S> {
pub fn new(inner: S) -> Self {
Self { inner }
}
}
impl<I> Stream for IntoStream<mpsc::Receiver<I>> {
type Item = I;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().0.poll_recv(cx)
self.project().inner.poll_recv(cx)
}
}
@ -33,7 +43,7 @@ impl<I> Stream for IntoStream<mpsc::UnboundedReceiver<I>> {
type Item = I;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().0.poll_recv(cx)
self.project().inner.poll_recv(cx)
}
}

View File

@ -51,7 +51,7 @@ fn ordered() {
admit: admit.clone(),
};
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let ca = srv.call_all(support::IntoStream(rx));
let ca = srv.call_all(support::IntoStream::new(rx));
pin_mut!(ca);
assert_pending!(mock.enter(|cx, _| ca.as_mut().poll_next(cx)));