diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 4194fc2d..550a8832 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -12,7 +12,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - rust: [stable, 1.39.0] + rust: [stable, 1.40.0] steps: - uses: actions/checkout@master - uses: actions-rs/toolchain@v1 @@ -77,7 +77,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - rust: [stable, beta, nightly, 1.39.0] + rust: [stable, beta, nightly, 1.40.0] steps: - uses: actions/checkout@master - uses: actions-rs/toolchain@v1 diff --git a/tower/Cargo.toml b/tower/Cargo.toml index f15eb14e..a3166296 100644 --- a/tower/Cargo.toml +++ b/tower/Cargo.toml @@ -32,7 +32,7 @@ discover = [] filter = [] hedge = ["filter", "futures-util", "hdrhistogram", "tokio/time"] limit = ["tokio/time"] -load = ["discover", "tokio/time"] +load = ["tokio/time"] load-shed = [] make = ["tokio/io-std"] ready-cache = ["futures-util", "indexmap", "tokio/sync"] @@ -73,4 +73,4 @@ all-features = true rustdoc-args = ["--cfg", "docsrs"] [package.metadata.playground] -features = ["full"] \ No newline at end of file +features = ["full"] diff --git a/tower-balance/examples/demo.rs b/tower/examples/tower-balance.rs similarity index 88% rename from tower-balance/examples/demo.rs rename to tower/examples/tower-balance.rs index cc22bd73..eb891c0f 100644 --- a/tower-balance/examples/demo.rs +++ b/tower/examples/tower-balance.rs @@ -1,21 +1,22 @@ //! Exercises load balancers with mocked services. -use futures_core::TryStream; +use futures_core::{Stream, TryStream}; use futures_util::{stream, stream::StreamExt, stream::TryStreamExt}; use hdrhistogram::Histogram; use pin_project::pin_project; use rand::{self, Rng}; +use std::hash::Hash; use std::time::Duration; use std::{ pin::Pin, task::{Context, Poll}, }; use tokio::time::{self, Instant}; +use tower::balance as lb; +use tower::discover::{Change, Discover}; +use tower::limit::concurrency::ConcurrencyLimit; +use tower::load; use tower::util::ServiceExt; -use tower_balance as lb; -use tower_discover::{Change, Discover}; -use tower_limit::concurrency::ConcurrencyLimit; -use tower_load as load; use tower_service::Service; const REQUESTS: usize = 100_000; @@ -61,13 +62,15 @@ async fn main() { d, DEFAULT_RTT, decay, - load::NoInstrument, + load::CompleteOnResponse::default(), )); run("P2C+PeakEWMA...", pe).await; let d = gen_disco(); - let ll = - lb::p2c::Balance::from_entropy(load::PendingRequestsDiscover::new(d, load::NoInstrument)); + let ll = lb::p2c::Balance::from_entropy(load::PendingRequestsDiscover::new( + d, + load::CompleteOnResponse::default(), + )); run("P2C+LeastLoaded...", ll).await; } @@ -78,20 +81,19 @@ type Key = usize; #[pin_project] struct Disco(Vec<(Key, S)>); -impl Discover for Disco +impl Stream for Disco where S: Service, { - type Key = Key; - type Service = S; - type Error = Error; - fn poll_discover( - self: Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll, Self::Error>> { + type Item = Result, Error>; + + fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { match self.project().0.pop() { - Some((k, service)) => Poll::Ready(Ok(Change::Insert(k, service))), - None => Poll::Pending, + Some((k, service)) => Poll::Ready(Some(Ok(Change::Insert(k, service)))), + None => { + // there may be more later + Poll::Pending + } } } } @@ -132,7 +134,7 @@ async fn run(name: &'static str, lb: lb::p2c::Balance) where D: Discover + Unpin + Send + 'static, D::Error: Into, - D::Key: Clone + Send, + D::Key: Clone + Send + Hash, D::Service: Service + load::Load + Send, >::Error: Into, >::Future: Send, diff --git a/tower/src/load/completion.rs b/tower/src/load/completion.rs new file mode 100644 index 00000000..d938d7a6 --- /dev/null +++ b/tower/src/load/completion.rs @@ -0,0 +1,92 @@ +//! Application-specific request completion semantics. + +use futures_core::ready; +use pin_project::pin_project; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +/// Attaches `H`-typed completion tracker to `V` typed values. +/// +/// Handles (of type `H`) are intended to be RAII guards that primarily implement `Drop` and update +/// load metric state as they are dropped. This trait allows implementors to "forward" the handle +/// to later parts of the request-handling pipeline, so that the handle is only dropped when the +/// request has truly completed. +/// +/// This utility allows load metrics to have a protocol-agnostic means to track streams past their +/// initial response future. For example, if `V` represents an HTTP response type, an +/// implementation could add `H`-typed handles to each response's extensions to detect when all the +/// response's extensions have been dropped. +/// +/// A base `impl TrackCompletion for CompleteOnResponse` is provided to drop the handle +/// once the response future is resolved. This is appropriate when a response is discrete and +/// cannot comprise multiple messages. +/// +/// In many cases, the `Output` type is simply `V`. However, `TrackCompletion` may alter the type +/// in order to instrument it appropriately. For example, an HTTP `TrackCompletion` may modify the +/// body type: so a `TrackCompletion` that takes values of type `http::Response` may output +/// values of type `http::Response`. +pub trait TrackCompletion: Clone { + /// The instrumented value type. + type Output; + + /// Attaches a `H`-typed handle to a `V`-typed value. + fn track_completion(&self, handle: H, value: V) -> Self::Output; +} + +/// A `TrackCompletion` implementation that considers the request completed when the response +/// future is resolved. +#[derive(Clone, Copy, Debug, Default)] +#[non_exhaustive] +pub struct CompleteOnResponse; + +/// Attaches a `C`-typed completion tracker to the result of an `F`-typed `Future`. +#[pin_project] +#[derive(Debug)] +pub struct TrackCompletionFuture { + #[pin] + future: F, + handle: Option, + completion: C, +} + +// ===== impl InstrumentFuture ===== + +impl TrackCompletionFuture { + /// Wraps a future, propagating the tracker into its value if successful. + pub fn new(completion: C, handle: H, future: F) -> Self { + TrackCompletionFuture { + future, + completion, + handle: Some(handle), + } + } +} + +impl Future for TrackCompletionFuture +where + F: Future>, + C: TrackCompletion, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let rsp = ready!(this.future.poll(cx))?; + let h = this.handle.take().expect("handle"); + Poll::Ready(Ok(this.completion.track_completion(h, rsp))) + } +} + +// ===== CompleteOnResponse ===== + +impl TrackCompletion for CompleteOnResponse { + type Output = V; + + fn track_completion(&self, handle: H, value: V) -> V { + drop(handle); + value + } +} diff --git a/tower/src/load/constant.rs b/tower/src/load/constant.rs index b9ea6ff4..f153689d 100644 --- a/tower/src/load/constant.rs +++ b/tower/src/load/constant.rs @@ -1,17 +1,20 @@ -//! A constant `Load` implementation. Primarily useful for testing. +//! A constant `Load` implementation. +#[cfg(feature = "discover")] use crate::discover::{Change, Discover}; +#[cfg(feature = "discover")] use futures_core::{ready, Stream}; -use pin_project::pin_project; -use std::{ - pin::Pin, - task::{Context, Poll}, -}; -use tower_service::Service; +#[cfg(feature = "discover")] +use std::pin::Pin; use super::Load; +use pin_project::pin_project; +use std::task::{Context, Poll}; +use tower_service::Service; -/// Wraps a type so that `Load::load` returns a constant value. +/// Wraps a type so that it implements `Load` and returns a constant load metric. +/// +/// This load estimator is primarily useful for testing. #[pin_project] #[derive(Debug)] pub struct Constant { @@ -55,6 +58,7 @@ where } /// Proxies `Discover` such that all changes are wrapped with a constant load. +#[cfg(feature = "discover")] impl Stream for Constant { type Item = Result>, D::Error>; diff --git a/tower/src/load/instrument.rs b/tower/src/load/instrument.rs deleted file mode 100644 index f9a0b9ac..00000000 --- a/tower/src/load/instrument.rs +++ /dev/null @@ -1,86 +0,0 @@ -use futures_core::ready; -use pin_project::pin_project; -use std::{ - future::Future, - pin::Pin, - task::{Context, Poll}, -}; - -/// Attaches `I`-typed instruments to `V` typed values. -/// -/// This utility allows load metrics to have a protocol-agnostic means to track streams -/// past their initial response future. For example, if `V` represents an HTTP response -/// type, an implementation could add `H`-typed handles to each response's extensions to -/// detect when the response is dropped. -/// -/// Handles are intended to be RAII guards that primarily implement `Drop` and update load -/// metric state as they are dropped. -/// -/// A base `impl Instrument for NoInstrument` is provided to drop the handle -/// immediately. This is appropriate when a response is discrete and cannot comprise -/// multiple messages. -/// -/// In many cases, the `Output` type is simply `V`. However, `Instrument` may alter the -/// type in order to instrument it appropriately. For example, an HTTP Instrument may -/// modify the body type: so an `Instrument` that takes values of type `http::Response` -/// may output values of type `http::Response`. -pub trait Instrument: Clone { - /// The instrumented value type. - type Output; - - /// Attaches an `H`-typed handle to a `V`-typed value. - fn instrument(&self, handle: H, value: V) -> Self::Output; -} - -/// A `Instrument` implementation that drops each instrument immediately. -#[derive(Clone, Copy, Debug)] -pub struct NoInstrument; - -/// Attaches a `I`-typed instruments to the result of an `F`-typed `Future`. -#[pin_project] -#[derive(Debug)] -pub struct InstrumentFuture { - #[pin] - future: F, - handle: Option, - instrument: I, -} - -// ===== impl InstrumentFuture ===== - -impl InstrumentFuture { - /// Wraps a future, instrumenting its value if successful. - pub fn new(instrument: I, handle: H, future: F) -> Self { - InstrumentFuture { - future, - instrument, - handle: Some(handle), - } - } -} - -impl Future for InstrumentFuture -where - F: Future>, - I: Instrument, -{ - type Output = Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - let rsp = ready!(this.future.poll(cx))?; - let h = this.handle.take().expect("handle"); - Poll::Ready(Ok(this.instrument.instrument(h, rsp))) - } -} - -// ===== NoInstrument ===== - -impl Instrument for NoInstrument { - type Output = V; - - fn instrument(&self, handle: H, value: V) -> V { - drop(handle); - value - } -} diff --git a/tower/src/load/mod.rs b/tower/src/load/mod.rs index fdd1f192..7b45ab06 100644 --- a/tower/src/load/mod.rs +++ b/tower/src/load/mod.rs @@ -1,22 +1,85 @@ -//! Abstractions and utilties for measuring a service's load. +//! Service load measurement +//! +//! This module provides the [`Load`] trait, which allows measuring how loaded a service is. +//! It also provides several wrapper types that measure load in different ways: +//! +//! - [`Constant`] — Always returns the same constant load value for a service. +//! - [`PendingRequests`] — Measures load by tracking the number of in-flight requests. +//! - [`PeakEwma`] — Measures load using a moving average of the peak latency for the service. +//! +//! In general, you will want to use one of these when using the types in [`tower::balance`] which +//! balance services depending on their load. Which load metric to use depends on your exact +//! use-case, but the ones above should get you quite far! +//! +//! When the `discover` feature is enabled, wrapper types for [`tower::discover::Discover`] that +//! wrap the discovered services with the given load estimator are also provided. +//! +//! # When does a request complete? +//! +//! For many applications, the request life-cycle is relatively simple: when a service responds to +//! a request, that request is done, and the system can forget about it. However, for some +//! applications, the service may respond to the initial request while other parts of the system +//! are still acting on that request. In such an application, the system load must take these +//! requests into account as well, or risk the system underestimating its own load. +//! +//! To support these use-cases, the load estimators in this module are parameterized by the +//! [`TrackCompletion`] trait, with [`CompleteOnResponse`] as the default type. The behavior of +//! `CompleteOnOnResponse` is what you would normally expect for a request-response cycle: when the +//! response is produced, the request is considered "finished", and load goes down. This can be +//! overriden by your own user-defined type to track more complex request completion semantics. See +//! the documentation for [`tower::load::completion`] for more details. +//! +//! # Examples +//! +//! ```rust +//! # #[cfg(feature = "util")] +//! use tower::util::ServiceExt; +//! # #[cfg(feature = "util")] +//! use tower::{load::Load, Service}; +//! # #[cfg(feature = "util")] +//! async fn simple_balance( +//! svc1: &mut S1, +//! svc2: &mut S2, +//! request: R +//! ) -> Result +//! where +//! S1: Load + Service, +//! S2: Load + Service +//! { +//! if svc1.load() < svc2.load() { +//! svc1.ready_and().await?.call(request).await +//! } else { +//! svc2.ready_and().await?.call(request).await +//! } +//! } +//! ``` +// TODO: a custom completion example would be good here +pub mod completion; mod constant; -mod instrument; pub mod peak_ewma; pub mod pending_requests; pub use self::{ + completion::{CompleteOnResponse, TrackCompletion}, constant::Constant, - instrument::{Instrument, InstrumentFuture, NoInstrument}, - peak_ewma::{PeakEwma, PeakEwmaDiscover}, - pending_requests::{PendingRequests, PendingRequestsDiscover}, + peak_ewma::PeakEwma, + pending_requests::PendingRequests, }; -/// Exposes a load metric. +#[cfg(feature = "discover")] +pub use self::{peak_ewma::PeakEwmaDiscover, pending_requests::PendingRequestsDiscover}; + +/// Types that implement this trait can give an estimate of how loaded they are. +/// +/// See the module documentation for more details. pub trait Load { - /// A comparable load metric. Lesser values are "preferable" to greater values. + /// A comparable load metric. + /// + /// Lesser values indicate that the service is less loaded, and should be preferred for new + /// requests over another service with a higher value. type Metric: PartialOrd; - /// Obtains a service's load. + /// Estimate the service's current load. fn load(&self) -> Self::Metric; } diff --git a/tower/src/load/peak_ewma.rs b/tower/src/load/peak_ewma.rs index 2369a844..73397a64 100644 --- a/tower/src/load/peak_ewma.rs +++ b/tower/src/load/peak_ewma.rs @@ -1,14 +1,17 @@ -//! A `Load` implementation that PeakEWMA on response latency. +//! A `Load` implementation that measures load using the PeakEWMA response latency. -use super::Load; -use super::{Instrument, InstrumentFuture, NoInstrument}; +#[cfg(feature = "discover")] use crate::discover::{Change, Discover}; +#[cfg(feature = "discover")] use futures_core::{ready, Stream}; +#[cfg(feature = "discover")] use pin_project::pin_project; -use std::{ - pin::Pin, - task::{Context, Poll}, -}; +#[cfg(feature = "discover")] +use std::pin::Pin; + +use super::completion::{CompleteOnResponse, TrackCompletion, TrackCompletionFuture}; +use super::Load; +use std::task::{Context, Poll}; use std::{ sync::{Arc, Mutex}, time::Duration, @@ -17,7 +20,7 @@ use tokio::time::Instant; use tower_service::Service; use tracing::trace; -/// Wraps an `S`-typed Service with Peak-EWMA load measurement. +/// Measures the load of the underlying service using Peak-EWMA load measurement. /// /// `PeakEwma` implements `Load` with the `Cost` metric that estimates the amount of /// pending work to an endpoint. Work is calculated by multiplying the @@ -26,11 +29,6 @@ use tracing::trace; /// worst-case latencies. Over time, the peak latency value decays towards the moving /// average of latencies to the endpoint. /// -/// As requests are sent to the underlying service, an `I`-typed instrumentation strategy -/// is used to track responses to measure latency in an application-specific way. The -/// default strategy measures latency as the elapsed time from the request being issued to -/// the underlying service to the response future being satisfied (or dropped). -/// /// When no latency information has been measured for an endpoint, an arbitrary default /// RTT of 1 second is used to prevent the endpoint from being overloaded before a /// meaningful baseline can be established.. @@ -43,22 +41,23 @@ use tracing::trace; /// [finagle]: /// https://github.com/twitter/finagle/blob/9cc08d15216497bb03a1cafda96b7266cfbbcff1/finagle-core/src/main/scala/com/twitter/finagle/loadbalancer/PeakEwma.scala #[derive(Debug)] -pub struct PeakEwma { +pub struct PeakEwma { service: S, decay_ns: f64, rtt_estimate: Arc>, - instrument: I, + completion: C, } -/// Wraps a `D`-typed stream of discovery updates with `PeakEwma`. +/// Wraps a `D`-typed stream of discovered services with `PeakEwma`. #[pin_project] #[derive(Debug)] -pub struct PeakEwmaDiscover { +#[cfg(feature = "discover")] +pub struct PeakEwmaDiscover { #[pin] discover: D, decay_ns: f64, default_rtt: Duration, - instrument: I, + completion: C, } /// Represents the relative cost of communicating with a service. @@ -87,65 +86,14 @@ const NANOS_PER_MILLI: f64 = 1_000_000.0; // ===== impl PeakEwma ===== -impl PeakEwmaDiscover { - /// Wraps a `D`-typed `Discover` so that services have a `PeakEwma` load metric. - /// - /// The provided `default_rtt` is used as the default RTT estimate for newly - /// added services. - /// - /// They `decay` value determines over what time period a RTT estimate should - /// decay. - pub fn new(discover: D, default_rtt: Duration, decay: Duration, instrument: I) -> Self - where - D: Discover, - D::Service: Service, - I: Instrument>::Response>, - { - PeakEwmaDiscover { - discover, - decay_ns: nanos(decay), - default_rtt, - instrument, - } - } -} - -impl Stream for PeakEwmaDiscover -where - D: Discover, - I: Clone, -{ - type Item = Result>, D::Error>; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); - let change = match ready!(this.discover.poll_discover(cx)).transpose()? { - None => return Poll::Ready(None), - Some(Change::Remove(k)) => Change::Remove(k), - Some(Change::Insert(k, svc)) => { - let peak_ewma = PeakEwma::new( - svc, - *this.default_rtt, - *this.decay_ns, - this.instrument.clone(), - ); - Change::Insert(k, peak_ewma) - } - }; - - Poll::Ready(Some(Ok(change))) - } -} - -// ===== impl PeakEwma ===== - -impl PeakEwma { - fn new(service: S, default_rtt: Duration, decay_ns: f64, instrument: I) -> Self { +impl PeakEwma { + /// Wraps an `S`-typed service so that its load is tracked by the EWMA of its peak latency. + pub fn new(service: S, default_rtt: Duration, decay_ns: f64, completion: C) -> Self { Self { service, decay_ns, rtt_estimate: Arc::new(Mutex::new(RttEstimate::new(nanos(default_rtt)))), - instrument, + completion, } } @@ -158,29 +106,29 @@ impl PeakEwma { } } -impl Service for PeakEwma +impl Service for PeakEwma where S: Service, - I: Instrument, + C: TrackCompletion, { - type Response = I::Output; + type Response = C::Output; type Error = S::Error; - type Future = InstrumentFuture; + type Future = TrackCompletionFuture; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.service.poll_ready(cx) } fn call(&mut self, req: Request) -> Self::Future { - InstrumentFuture::new( - self.instrument.clone(), + TrackCompletionFuture::new( + self.completion.clone(), self.handle(), self.service.call(req), ) } } -impl Load for PeakEwma { +impl Load for PeakEwma { type Metric = Cost; fn load(&self) -> Self::Metric { @@ -201,13 +149,67 @@ impl Load for PeakEwma { } } -impl PeakEwma { +impl PeakEwma { fn update_estimate(&self) -> f64 { let mut rtt = self.rtt_estimate.lock().expect("peak ewma prior_estimate"); rtt.decay(self.decay_ns) } } +// ===== impl PeakEwmaDiscover ===== + +#[cfg(feature = "discover")] +impl PeakEwmaDiscover { + /// Wraps a `D`-typed `Discover` so that services have a `PeakEwma` load metric. + /// + /// The provided `default_rtt` is used as the default RTT estimate for newly + /// added services. + /// + /// They `decay` value determines over what time period a RTT estimate should + /// decay. + pub fn new(discover: D, default_rtt: Duration, decay: Duration, completion: C) -> Self + where + D: Discover, + D::Service: Service, + C: TrackCompletion>::Response>, + { + PeakEwmaDiscover { + discover, + decay_ns: nanos(decay), + default_rtt, + completion, + } + } +} + +#[cfg(feature = "discover")] +impl Stream for PeakEwmaDiscover +where + D: Discover, + C: Clone, +{ + type Item = Result>, D::Error>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + let change = match ready!(this.discover.poll_discover(cx)).transpose()? { + None => return Poll::Ready(None), + Some(Change::Remove(k)) => Change::Remove(k), + Some(Change::Insert(k, svc)) => { + let peak_ewma = PeakEwma::new( + svc, + *this.default_rtt, + *this.decay_ns, + this.completion.clone(), + ); + Change::Insert(k, peak_ewma) + } + }; + + Poll::Ready(Some(Ok(change))) + } +} + // ===== impl RttEstimate ===== impl RttEstimate { @@ -336,7 +338,7 @@ mod tests { Svc, Duration::from_millis(10), NANOS_PER_MILLI * 1_000.0, - NoInstrument, + CompleteOnResponse, ); let Cost(load) = svc.load(); assert_eq!(load, 10.0 * NANOS_PER_MILLI); @@ -360,7 +362,7 @@ mod tests { Svc, Duration::from_millis(20), NANOS_PER_MILLI * 1_000.0, - NoInstrument, + CompleteOnResponse, ); assert_eq!(svc.load(), Cost(20.0 * NANOS_PER_MILLI)); diff --git a/tower/src/load/pending_requests.rs b/tower/src/load/pending_requests.rs index a53950c8..ec7b9886 100644 --- a/tower/src/load/pending_requests.rs +++ b/tower/src/load/pending_requests.rs @@ -1,37 +1,40 @@ -//! A `Load` implementation that uses the count of in-flight requests. +//! A `Load` implementation that measures load using the number of in-flight requests. -use super::Load; -use super::{Instrument, InstrumentFuture, NoInstrument}; +#[cfg(feature = "discover")] use crate::discover::{Change, Discover}; +#[cfg(feature = "discover")] use futures_core::{ready, Stream}; +#[cfg(feature = "discover")] use pin_project::pin_project; +#[cfg(feature = "discover")] +use std::pin::Pin; + +use super::completion::{CompleteOnResponse, TrackCompletion, TrackCompletionFuture}; +use super::Load; use std::sync::Arc; -use std::{ - pin::Pin, - task::{Context, Poll}, -}; +use std::task::{Context, Poll}; use tower_service::Service; -/// Expresses load based on the number of currently-pending requests. +/// Measures the load of the underlying service using the number of currently-pending requests. #[derive(Debug)] -pub struct PendingRequests { +pub struct PendingRequests { service: S, ref_count: RefCount, - instrument: I, + completion: C, } -/// Shared between instances of `PendingRequests` and `Handle` to track active -/// references. +/// Shared between instances of `PendingRequests` and `Handle` to track active references. #[derive(Clone, Debug, Default)] struct RefCount(Arc<()>); -/// Wraps `inner`'s services with `PendingRequests`. +/// Wraps a `D`-typed stream of discovered services with `PendingRequests`. #[pin_project] #[derive(Debug)] -pub struct PendingRequestsDiscover { +#[cfg(feature = "discover")] +pub struct PendingRequestsDiscover { #[pin] discover: D, - instrument: I, + completion: C, } /// Represents the number of currently-pending requests to a given service. @@ -44,11 +47,12 @@ pub struct Handle(RefCount); // ===== impl PendingRequests ===== -impl PendingRequests { - fn new(service: S, instrument: I) -> Self { +impl PendingRequests { + /// Wraps an `S`-typed service so that its load is tracked by the number of pending requests. + pub fn new(service: S, completion: C) -> Self { Self { service, - instrument, + completion, ref_count: RefCount::default(), } } @@ -58,7 +62,7 @@ impl PendingRequests { } } -impl Load for PendingRequests { +impl Load for PendingRequests { type Metric = Count; fn load(&self) -> Count { @@ -67,22 +71,22 @@ impl Load for PendingRequests { } } -impl Service for PendingRequests +impl Service for PendingRequests where S: Service, - I: Instrument, + C: TrackCompletion, { - type Response = I::Output; + type Response = C::Output; type Error = S::Error; - type Future = InstrumentFuture; + type Future = TrackCompletionFuture; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.service.poll_ready(cx) } fn call(&mut self, req: Request) -> Self::Future { - InstrumentFuture::new( - self.instrument.clone(), + TrackCompletionFuture::new( + self.completion.clone(), self.handle(), self.service.call(req), ) @@ -91,27 +95,29 @@ where // ===== impl PendingRequestsDiscover ===== -impl PendingRequestsDiscover { +#[cfg(feature = "discover")] +impl PendingRequestsDiscover { /// Wraps a `Discover``, wrapping all of its services with `PendingRequests`. - pub fn new(discover: D, instrument: I) -> Self + pub fn new(discover: D, completion: C) -> Self where D: Discover, D::Service: Service, - I: Instrument>::Response>, + C: TrackCompletion>::Response>, { Self { discover, - instrument, + completion, } } } -impl Stream for PendingRequestsDiscover +#[cfg(feature = "discover")] +impl Stream for PendingRequestsDiscover where D: Discover, - I: Clone, + C: Clone, { - type Item = Result>, D::Error>; + type Item = Result>, D::Error>; /// Yields the next discovery change set. fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -120,7 +126,7 @@ where let this = self.project(); let change = match ready!(this.discover.poll_discover(cx)).transpose()? { None => return Poll::Ready(None), - Some(Insert(k, svc)) => Insert(k, PendingRequests::new(svc, this.instrument.clone())), + Some(Insert(k, svc)) => Insert(k, PendingRequests::new(svc, this.completion.clone())), Some(Remove(k)) => Remove(k), }; @@ -159,7 +165,7 @@ mod tests { #[test] fn default() { - let mut svc = PendingRequests::new(Svc, NoInstrument); + let mut svc = PendingRequests::new(Svc, CompleteOnResponse); assert_eq!(svc.load(), Count(0)); let rsp0 = svc.call(()); @@ -176,12 +182,12 @@ mod tests { } #[test] - fn instrumented() { + fn with_completion() { #[derive(Clone)] struct IntoHandle; - impl Instrument for IntoHandle { + impl TrackCompletion for IntoHandle { type Output = Handle; - fn instrument(&self, i: Handle, (): ()) -> Handle { + fn track_completion(&self, i: Handle, (): ()) -> Handle { i } }