Update tower-balance to std::future (#335)

This bumps tower-balance to 0.3.0-alpha.1

It also adds delegate impls for `Discover` through `Pin`, and makes `tower-load::Constant: Debug`.
This commit is contained in:
Jon Gjengset 2019-09-10 18:15:32 -04:00 committed by GitHub
parent 1ca999fde1
commit 87976ae418
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 507 additions and 502 deletions

View File

@ -2,7 +2,7 @@
members = [
"tower",
# "tower-balance",
"tower-balance",
"tower-buffer",
"tower-discover",
"tower-filter",

View File

@ -8,13 +8,13 @@ name = "tower-balance"
# - README.md
# - Update CHANGELOG.md.
# - Create "v0.1.x" git tag.
version = "0.1.0"
version = "0.3.0-alpha.1"
authors = ["Tower Maintainers <team@tower-rs.com>"]
license = "MIT"
readme = "README.md"
repository = "https://github.com/tower-rs/tower"
homepage = "https://github.com/tower-rs/tower"
documentation = "https://docs.rs/tower-balance/0.1.0"
documentation = "https://docs.rs/tower-balance/0.3.0-alpha.1"
description = """
Balance load across a set of uniform services.
"""
@ -27,26 +27,30 @@ log = ["tracing/log"]
default = ["log"]
[dependencies]
futures = "0.1.26"
futures-util-preview = "0.3.0-alpha.18"
futures-core-preview = "0.3.0-alpha.18"
pin-project = "0.4.0-alpha.10"
indexmap = "1.0.2"
tracing = "0.1"
rand = "0.6.5"
tokio-sync = "0.1.3"
tokio-timer = "0.2.4"
tower-discover = "0.1.0"
tower-layer = "0.1.0"
tower-load = { version = "0.1.0", path = "../tower-load" }
tower-service = "0.2.0"
tower-util = "0.1.0"
tokio-sync = "0.2.0-alpha.4"
tokio-timer = "0.3.0-alpha.4"
tower-discover = { version = "0.3.0-alpha.1", path = "../tower-discover" }
tower-layer = { version = "0.3.0-alpha.1", path = "../tower-layer" }
tower-load = { version = "0.3.0-alpha.1", path = "../tower-load" }
tower-service = "0.3.0-alpha.1"
tower-make = { version = "0.3.0-alpha.1", path = "../tower-make" }
tower-util = { version = "0.3.0-alpha.1", path = "../tower-util" }
slab = "0.4"
[dev-dependencies]
tracing-fmt = { git = "https://github.com/tokio-rs/tracing.git" }
tracing-subscriber = "0.1.1"
hdrhistogram = "6.0"
quickcheck = { version = "0.6", default-features = false }
tokio = "0.1.7"
tokio-executor = "0.1.2"
tower = { version = "*", path = "../tower" }
tower-buffer = { version = "*", path = "../tower-buffer" }
tower-limit = { version = "*", path = "../tower-limit" }
tower-test = { version = "*", path = "../tower-test" }
tokio = "0.2.0-alpha.4"
tokio-executor = "0.2.0-alpha.4"
tokio-test = "0.2.0-alpha.4"
tower-buffer = { version = "0.3.0-alpha.1", path = "../tower-buffer" }
tower-limit = { version = "0.3.0-alpha.1", path = "../tower-limit" }
tower-test = { version = "0.3.0-alpha.1", path = "../tower-test" }
tower = { version = "0.3.0-alpha.1", path = "../tower" }

View File

@ -1,17 +1,22 @@
//! Exercises load balancers with mocked services.
use futures::{future, stream, Async, Future, Poll, Stream};
use futures_core::TryStream;
use futures_util::{stream, stream::StreamExt, try_stream::TryStreamExt};
use hdrhistogram::Histogram;
use pin_project::pin_project;
use rand::{self, Rng};
use std::time::{Duration, Instant};
use tokio::{runtime, timer};
use tower::{
discover::{Change, Discover},
limit::concurrency::ConcurrencyLimit,
Service, ServiceExt,
use std::{
pin::Pin,
task::{Context, Poll},
};
use tokio::timer;
use tower::util::ServiceExt;
use tower_balance as lb;
use tower_discover::{Change, Discover};
use tower_limit::concurrency::ConcurrencyLimit;
use tower_load as load;
use tower_service::Service;
const REQUESTS: usize = 100_000;
const CONCURRENCY: usize = 500;
@ -36,8 +41,9 @@ struct Summary {
count_by_instance: [usize; 10],
}
fn main() {
tracing::subscriber::set_global_default(tracing_fmt::FmtSubscriber::default()).unwrap();
#[tokio::main]
async fn main() {
tracing::subscriber::set_global_default(tracing_subscriber::FmtSubscriber::default()).unwrap();
println!("REQUESTS={}", REQUESTS);
println!("CONCURRENCY={}", CONCURRENCY);
@ -49,37 +55,27 @@ fn main() {
}
println!("]");
let mut rt = runtime::Runtime::new().unwrap();
let decay = Duration::from_secs(10);
let d = gen_disco();
let pe = lb::p2c::Balance::from_entropy(load::PeakEwmaDiscover::new(
d,
DEFAULT_RTT,
decay,
load::NoInstrument,
));
run("P2C+PeakEWMA...", pe).await;
let fut = future::lazy(move || {
let decay = Duration::from_secs(10);
let d = gen_disco();
let pe = lb::p2c::Balance::from_entropy(load::PeakEwmaDiscover::new(
d,
DEFAULT_RTT,
decay,
load::NoInstrument,
));
run("P2C+PeakEWMA...", pe)
});
let fut = fut.then(move |_| {
let d = gen_disco();
let ll = lb::p2c::Balance::from_entropy(load::PendingRequestsDiscover::new(
d,
load::NoInstrument,
));
run("P2C+LeastLoaded...", ll)
});
rt.spawn(fut);
rt.shutdown_on_idle().wait().unwrap();
let d = gen_disco();
let ll =
lb::p2c::Balance::from_entropy(load::PendingRequestsDiscover::new(d, load::NoInstrument));
run("P2C+LeastLoaded...", ll).await;
}
type Error = Box<dyn std::error::Error + Send + Sync>;
type Key = usize;
#[pin_project]
struct Disco<S>(Vec<(Key, S)>);
impl<S> Discover for Disco<S>
@ -89,10 +85,13 @@ where
type Key = Key;
type Service = S;
type Error = Error;
fn poll(&mut self) -> Poll<Change<Self::Key, Self::Service>, Self::Error> {
match self.0.pop() {
Some((k, service)) => Ok(Change::Insert(k, service).into()),
None => Ok(Async::NotReady),
fn poll_discover(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Result<Change<Self::Key, Self::Service>, Self::Error>> {
match self.project().0.pop() {
Some((k, service)) => Poll::Ready(Ok(Change::Insert(k, service))),
None => Poll::Pending,
}
}
}
@ -116,12 +115,11 @@ fn gen_disco() -> impl Discover<
.saturating_add(latency.as_secs().saturating_mul(1_000));
let latency = Duration::from_millis(rand::thread_rng().gen_range(0, maxms));
timer::Delay::new(start + latency)
.map_err(Into::into)
.map(move |_| {
let latency = start.elapsed();
Rsp { latency, instance }
})
async move {
timer::delay(start + latency).await;
let latency = start.elapsed();
Ok(Rsp { latency, instance })
}
});
(instance, ConcurrencyLimit::new(svc, ENDPOINT_CAPACITY))
@ -130,9 +128,9 @@ fn gen_disco() -> impl Discover<
)
}
fn run<D>(name: &'static str, lb: lb::p2c::Balance<D, Req>) -> impl Future<Item = (), Error = ()>
async fn run<D>(name: &'static str, lb: lb::p2c::Balance<D, Req>)
where
D: Discover + Send + 'static,
D: Discover + Unpin + Send + 'static,
D::Error: Into<Error>,
D::Key: Clone + Send,
D::Service: Service<Req, Response = Rsp> + load::Load + Send,
@ -142,21 +140,22 @@ where
{
println!("{}", name);
let requests = stream::repeat::<_, Error>(Req).take(REQUESTS as u64);
let requests = stream::repeat(Req).take(REQUESTS as u64);
let service = ConcurrencyLimit::new(lb, CONCURRENCY);
let responses = service.call_all(requests).unordered();
compute_histo(responses).map(|s| s.report()).map_err(|_| {})
compute_histo(responses).await.unwrap().report();
}
fn compute_histo<S>(times: S) -> impl Future<Item = Summary, Error = Error> + 'static
async fn compute_histo<S>(mut times: S) -> Result<Summary, Error>
where
S: Stream<Item = Rsp, Error = Error> + 'static,
S: TryStream<Ok = Rsp, Error = Error> + 'static + Unpin,
{
times.fold(Summary::new(), |mut summary, rsp| {
let mut summary = Summary::new();
while let Some(rsp) = times.try_next().await? {
summary.count(rsp);
Ok(summary) as Result<_, Error>
})
}
Ok(summary)
}
impl Summary {

View File

@ -1,6 +1,6 @@
//! Load balancing middlewares.
#![doc(html_root_url = "https://docs.rs/tower-balance/0.1.0")]
#![doc(html_root_url = "https://docs.rs/tower-balance/0.3.0-alpha.1")]
#![deny(missing_docs)]
#![deny(rust_2018_idioms)]
#![allow(elided_lifetimes_in_paths)]

View File

@ -1,7 +1,13 @@
use super::Balance;
use futures::{try_ready, Future, Poll};
use futures_core::ready;
use pin_project::pin_project;
use rand::{rngs::SmallRng, FromEntropy};
use std::marker::PhantomData;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tower_discover::Discover;
use tower_service::Service;
@ -13,8 +19,10 @@ pub struct BalanceMake<S, Req> {
_marker: PhantomData<fn(Req)>,
}
#[pin_project]
/// Makes a balancer instance.
pub struct MakeFuture<F, Req> {
#[pin]
inner: F,
rng: SmallRng,
_marker: PhantomData<fn(Req)>,
@ -45,8 +53,8 @@ where
type Error = S::Error;
type Future = MakeFuture<S::Future, Req>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.inner.poll_ready()
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, target: Target) -> Self::Future {
@ -58,18 +66,18 @@ where
}
}
impl<F, Req> Future for MakeFuture<F, Req>
impl<F, T, E, Req> Future for MakeFuture<F, Req>
where
F: Future,
F::Item: Discover,
<F::Item as Discover>::Service: Service<Req>,
F: Future<Output = Result<T, E>>,
T: Discover,
<T as Discover>::Service: Service<Req>,
{
type Item = Balance<F::Item, Req>;
type Error = F::Error;
type Output = Result<Balance<T, Req>, E>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let inner = try_ready!(self.inner.poll());
let svc = Balance::new(inner, self.rng.clone());
Ok(svc.into())
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let inner = ready!(this.inner.poll(cx))?;
let svc = Balance::new(inner, this.rng.clone());
Poll::Ready(Ok(svc))
}
}

View File

@ -1,7 +1,14 @@
use crate::error;
use futures::{future, stream, try_ready, Async, Future, Poll, Stream};
use futures_core::{ready, Stream};
use futures_util::{stream, try_future, try_future::TryFutureExt};
use indexmap::IndexMap;
use pin_project::pin_project;
use rand::{rngs::SmallRng, FromEntropy};
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tokio_sync::oneshot;
use tower_discover::{Change, Discover};
use tower_load::Load;
@ -20,8 +27,15 @@ use tracing::{debug, trace};
/// > The maximum load variance between any two servers is bound by `ln(ln(n))` where
/// > `n` is the number of servers in the cluster.
///
/// Note that `Balance` requires that the `Discover` you use is `Unpin` in order to implement
/// `Service`. This is because it needs to be accessed from `Service::poll_ready`, which takes
/// `&mut self`. You can achieve this easily by wrapping your `Discover` in [`Box::pin`] before you
/// construct the `Balance` instance. For more details, see [#319].
///
/// [finagle]: https://twitter.github.io/finagle/guide/Clients.html#power-of-two-choices-p2c-least-loaded
/// [p2c]: http://www.eecs.harvard.edu/~michaelm/postscripts/handbook2001.pdf
/// [`Box::pin`]: https://doc.rust-lang.org/std/boxed/struct.Box.html#method.pin
/// [#319]: https://github.com/tower-rs/tower/issues/319
#[derive(Debug)]
pub struct Balance<D: Discover, Req> {
discover: D,
@ -38,13 +52,16 @@ pub struct Balance<D: Discover, Req> {
rng: SmallRng,
}
#[pin_project]
/// A Future that becomes satisfied when an `S`-typed service is ready.
///
/// May fail due to cancelation, i.e. if the service is removed from discovery.
#[derive(Debug)]
struct UnreadyService<K, S, Req> {
key: Option<K>,
#[pin]
cancel: oneshot::Receiver<()>,
#[pin]
ready: tower_util::Ready<S, Req>,
}
@ -88,7 +105,7 @@ where
impl<D, Req> Balance<D, Req>
where
D: Discover,
D: Discover + Unpin,
D::Key: Clone,
D::Error: Into<error::Error>,
D::Service: Service<Req> + Load,
@ -98,10 +115,12 @@ where
/// Polls `discover` for updates, adding new items to `not_ready`.
///
/// Removals may alter the order of either `ready` or `not_ready`.
fn poll_discover(&mut self) -> Poll<(), error::Discover> {
fn poll_discover(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), error::Discover>> {
debug!("updating from discover");
loop {
match try_ready!(self.discover.poll().map_err(|e| error::Discover(e.into()))) {
match ready!(Pin::new(&mut self.discover).poll_discover(cx))
.map_err(|e| error::Discover(e.into()))?
{
Change::Remove(key) => {
trace!("remove");
self.evict(&key)
@ -132,23 +151,25 @@ where
.next_ready_index
.and_then(|i| Self::repair_index(i, idx, self.ready_services.len()));
debug_assert!(!self.cancelations.contains_key(key));
} else if let Some(cancel) = self.cancelations.remove(key) {
} else if let Some(cancel) = self.cancelations.swap_remove(key) {
let _ = cancel.send(());
}
}
fn poll_unready(&mut self) {
fn poll_unready(&mut self, cx: &mut Context<'_>) {
loop {
match self.unready_services.poll() {
Ok(Async::NotReady) | Ok(Async::Ready(None)) => return,
Ok(Async::Ready(Some((key, svc)))) => {
match Pin::new(&mut self.unready_services).poll_next(cx) {
Poll::Pending | Poll::Ready(None) => return,
Poll::Ready(Some(Ok((key, svc)))) => {
trace!("endpoint ready");
let _cancel = self.cancelations.remove(&key);
let _cancel = self.cancelations.swap_remove(&key);
debug_assert!(_cancel.is_some(), "missing cancelation");
self.ready_services.insert(key, svc);
}
Err((key, Error::Canceled)) => debug_assert!(!self.cancelations.contains_key(&key)),
Err((key, Error::Inner(e))) => {
Poll::Ready(Some(Err((key, Error::Canceled)))) => {
debug_assert!(!self.cancelations.contains_key(&key))
}
Poll::Ready(Some(Err((key, Error::Inner(e))))) => {
let error = e.into();
debug!({ %error }, "dropping failed endpoint");
let _cancel = self.cancelations.swap_remove(&key);
@ -207,31 +228,35 @@ where
svc.load()
}
fn poll_ready_index_or_evict(&mut self, index: usize) -> Poll<(), ()> {
fn poll_ready_index_or_evict(
&mut self,
cx: &mut Context<'_>,
index: usize,
) -> Poll<Result<(), ()>> {
let (_, svc) = self
.ready_services
.get_index_mut(index)
.expect("invalid index");
match svc.poll_ready() {
Ok(Async::Ready(())) => Ok(Async::Ready(())),
Ok(Async::NotReady) => {
match svc.poll_ready(cx) {
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
Poll::Pending => {
// became unready; so move it back there.
let (key, svc) = self
.ready_services
.swap_remove_index(index)
.expect("invalid ready index");
self.push_unready(key, svc);
Ok(Async::NotReady)
Poll::Pending
}
Err(e) => {
Poll::Ready(Err(e)) => {
// failed, so drop it.
let error = e.into();
debug!({ %error }, "evicting failed endpoint");
self.ready_services
.swap_remove_index(index)
.expect("invalid ready index");
Err(())
Poll::Ready(Err(()))
}
}
}
@ -239,7 +264,7 @@ where
impl<D, Req> Service<Req> for Balance<D, Req>
where
D: Discover,
D: Discover + Unpin,
D::Key: Clone,
D::Error: Into<error::Error>,
D::Service: Service<Req> + Load,
@ -248,7 +273,7 @@ where
{
type Response = <D::Service as Service<Req>>::Response;
type Error = error::Error;
type Future = future::MapErr<
type Future = try_future::MapErr<
<D::Service as Service<Req>>::Future,
fn(<D::Service as Service<Req>>::Error) -> error::Error,
>;
@ -257,13 +282,13 @@ where
///
/// When `Async::Ready` is returned, `ready_index` is set with a valid index
/// into `ready` referring to a `Service` that is ready to disptach a request.
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// First and foremost, process discovery updates. This removes or updates a
// previously-selected `ready_index` if appropriate.
self.poll_discover()?;
let _ = self.poll_discover(cx)?;
// Drive new or busy services to readiness.
self.poll_unready();
self.poll_unready(cx);
trace!({ nready = self.ready_services.len(), nunready = self.unready_services.len() }, "poll_ready");
loop {
@ -276,8 +301,8 @@ where
trace!({ next.idx = index }, "preselected ready_index");
debug_assert!(index < self.ready_services.len());
if let Ok(Async::Ready(())) = self.poll_ready_index_or_evict(index) {
return Ok(Async::Ready(()));
if let Poll::Ready(Ok(())) = self.poll_ready_index_or_evict(cx, index) {
return Poll::Ready(Ok(()));
}
self.next_ready_index = None;
@ -286,7 +311,7 @@ where
self.next_ready_index = self.p2c_next_ready_index();
if self.next_ready_index.is_none() {
debug_assert!(self.ready_services.is_empty());
return Ok(Async::NotReady);
return Poll::Pending;
}
}
}
@ -307,24 +332,24 @@ where
}
impl<K, S: Service<Req>, Req> Future for UnreadyService<K, S, Req> {
type Item = (K, S);
type Error = (K, Error<S::Error>);
type Output = Result<(K, S), (K, Error<S::Error>)>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Ok(Async::Ready(())) = self.cancel.poll() {
let key = self.key.take().expect("polled after ready");
return Err((key, Error::Canceled));
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
if let Poll::Ready(Ok(())) = this.cancel.poll(cx) {
let key = this.key.take().expect("polled after ready");
return Poll::Ready(Err((key, Error::Canceled)));
}
match self.ready.poll() {
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(svc)) => {
let key = self.key.take().expect("polled after ready");
Ok((key, svc).into())
match ready!(this.ready.poll(cx)) {
Ok(svc) => {
let key = this.key.take().expect("polled after ready");
Poll::Ready(Ok((key, svc)))
}
Err(e) => {
let key = self.key.take().expect("polled after ready");
Err((key, Error::Inner(e)))
let key = this.key.take().expect("polled after ready");
Poll::Ready(Err((key, Error::Inner(e))))
}
}
}

View File

@ -1,128 +1,122 @@
use futures::{Async, Future};
use futures_util::pin_mut;
use std::{future::Future, task::Poll};
use tokio_test::{assert_pending, assert_ready, assert_ready_ok, task};
use tower_discover::ServiceList;
use tower_load as load;
use tower_service::Service;
use tower_test::mock;
use tower_test::{assert_request_eq, mock};
use super::*;
macro_rules! assert_ready {
($svc:expr) => {{
assert_ready!($svc, "must be ready");
}};
($svc:expr, $msg:expr) => {{
assert!($svc.poll_ready().expect("must not fail").is_ready(), $msg);
}};
}
macro_rules! assert_not_ready {
($svc:expr) => {{
assert_not_ready!($svc, "must not be ready");
}};
($svc:expr, $msg:expr) => {{
assert!(!$svc.poll_ready().expect("must not fail").is_ready(), $msg);
}};
}
#[test]
fn empty() {
let empty: Vec<load::Constant<mock::Mock<(), &'static str>, usize>> = vec![];
let disco = ServiceList::new(empty);
let mut svc = Balance::from_entropy(disco);
with_task(|| {
assert_not_ready!(svc);
})
task::mock(|cx| {
let empty: Vec<load::Constant<mock::Mock<(), &'static str>, usize>> = vec![];
let disco = ServiceList::new(empty);
let mut svc = Balance::from_entropy(disco);
assert_pending!(svc.poll_ready(cx));
});
}
#[test]
fn single_endpoint() {
let (mock, mut handle) = mock::pair();
let mock = load::Constant::new(mock, 0);
task::mock(|cx| {
let (mock, handle) = mock::pair();
pin_mut!(handle);
let disco = ServiceList::new(vec![mock].into_iter());
let mut svc = Balance::from_entropy(disco);
let mock = load::Constant::new(mock, 0);
let disco = ServiceList::new(vec![mock].into_iter());
let mut svc = Balance::from_entropy(disco);
with_task(|| {
handle.allow(0);
assert_not_ready!(svc);
assert_pending!(svc.poll_ready(cx));
assert_eq!(svc.len(), 1, "balancer must have discovered endpoint");
handle.allow(1);
assert_ready!(svc);
assert_ready_ok!(svc.poll_ready(cx));
let fut = svc.call(());
pin_mut!(fut);
let ((), rsp) = handle.next_request().unwrap();
rsp.send_response(1);
assert_request_eq!(handle, ()).send_response(1);
assert_eq!(fut.wait().expect("call must complete"), 1);
assert_eq!(assert_ready_ok!(fut.poll(cx)), 1);
handle.allow(1);
assert_ready!(svc);
assert_ready_ok!(svc.poll_ready(cx));
handle.send_error("endpoint lost");
assert_not_ready!(svc);
assert_pending!(svc.poll_ready(cx));
assert!(svc.len() == 0, "balancer must drop failed endpoints");
});
}
#[test]
fn two_endpoints_with_equal_load() {
let (mock_a, mut handle_a) = mock::pair();
let (mock_b, mut handle_b) = mock::pair();
let mock_a = load::Constant::new(mock_a, 1);
let mock_b = load::Constant::new(mock_b, 1);
task::mock(|cx| {
let (mock_a, handle_a) = mock::pair();
let (mock_b, handle_b) = mock::pair();
let mock_a = load::Constant::new(mock_a, 1);
let mock_b = load::Constant::new(mock_b, 1);
let disco = ServiceList::new(vec![mock_a, mock_b].into_iter());
let mut svc = Balance::from_entropy(disco);
pin_mut!(handle_a);
pin_mut!(handle_b);
let disco = ServiceList::new(vec![mock_a, mock_b].into_iter());
let mut svc = Balance::from_entropy(disco);
with_task(|| {
handle_a.allow(0);
handle_b.allow(0);
assert_not_ready!(svc);
assert_pending!(svc.poll_ready(cx));
assert_eq!(svc.len(), 2, "balancer must have discovered both endpoints");
handle_a.allow(1);
handle_b.allow(0);
assert_ready!(svc, "must be ready when one of two services is ready");
assert_ready_ok!(
svc.poll_ready(cx),
"must be ready when one of two services is ready"
);
{
let fut = svc.call(());
let ((), rsp) = handle_a.next_request().unwrap();
rsp.send_response("a");
assert_eq!(fut.wait().expect("call must complete"), "a");
pin_mut!(fut);
assert_request_eq!(handle_a, ()).send_response("a");
assert_eq!(assert_ready_ok!(fut.poll(cx)), "a");
}
handle_a.allow(0);
handle_b.allow(1);
assert_ready!(svc, "must be ready when both endpoints are ready");
assert_ready_ok!(
svc.poll_ready(cx),
"must be ready when both endpoints are ready"
);
{
let fut = svc.call(());
let ((), rsp) = handle_b.next_request().unwrap();
rsp.send_response("b");
assert_eq!(fut.wait().expect("call must complete"), "b");
pin_mut!(fut);
assert_request_eq!(handle_b, ()).send_response("b");
assert_eq!(assert_ready_ok!(fut.poll(cx)), "b");
}
handle_a.allow(1);
handle_b.allow(1);
for _ in 0..2 {
assert_ready!(svc, "must be ready when both endpoints are ready");
assert_ready_ok!(
svc.poll_ready(cx),
"must be ready when both endpoints are ready"
);
let fut = svc.call(());
pin_mut!(fut);
for (ref mut h, c) in &mut [(&mut handle_a, "a"), (&mut handle_b, "b")] {
if let Async::Ready(Some((_, tx))) = h.poll_request().unwrap() {
if let Poll::Ready(Some((_, tx))) = h.as_mut().poll_request(cx) {
tracing::info!("using {}", c);
tx.send_response(c);
h.allow(0);
}
}
fut.wait().expect("call must complete");
assert_ready_ok!(fut.as_mut().poll(cx));
}
handle_a.send_error("endpoint lost");
assert_not_ready!(svc, "must be not be ready");
assert_pending!(svc.poll_ready(cx));
assert_eq!(svc.len(), 1, "balancer must drop failed endpoints",);
});
}
fn with_task<F: FnOnce() -> U, U>(f: F) -> U {
use futures::future::lazy;
lazy(|| Ok::<_, ()>(f())).wait().unwrap()
}

View File

@ -2,7 +2,7 @@
//!
//! The pool uses `poll_ready` as a signal indicating whether additional services should be spawned
//! to handle the current level of load. Specifically, every time `poll_ready` on the inner service
//! returns `Ready`, [`Pool`] consider that a 0, and every time it returns `NotReady`, [`Pool`]
//! returns `Ready`, [`Pool`] consider that a 0, and every time it returns `Pending`, [`Pool`]
//! considers it a 1. [`Pool`] then maintains an [exponential moving
//! average](https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average) over those
//! samples, which gives an estimate of how often the underlying service has been ready when it was
@ -16,12 +16,18 @@
use super::p2c::Balance;
use crate::error;
use futures::{try_ready, Async, Future, Poll, Stream};
use futures_core::{ready, Stream};
use pin_project::pin_project;
use slab::Slab;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tower_discover::{Change, Discover};
use tower_load::Load;
use tower_make::MakeService;
use tower_service::Service;
use tower_util::MakeService;
#[cfg(test)]
mod test;
@ -36,6 +42,7 @@ enum Level {
High,
}
#[pin_project]
/// A wrapper around `MakeService` that discovers a new service when load is high, and removes a
/// service when load is low. See [`Pool`].
pub struct PoolDiscoverer<MS, Target, Request>
@ -43,11 +50,13 @@ where
MS: MakeService<Target, Request>,
{
maker: MS,
#[pin]
making: Option<MS::Future>,
target: Target,
load: Level,
services: Slab<()>,
died_tx: tokio_sync::mpsc::UnboundedSender<usize>,
#[pin]
died_rx: tokio_sync::mpsc::UnboundedReceiver<usize>,
limit: Option<usize>,
}
@ -63,83 +72,84 @@ where
type Service = DropNotifyService<MS::Service>;
type Error = MS::MakeError;
fn poll(&mut self) -> Poll<Change<Self::Key, Self::Service>, Self::Error> {
while let Async::Ready(Some(sid)) = self
.died_rx
.poll()
.expect("cannot be closed as we hold tx too")
{
self.services.remove(sid);
fn poll_discover(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Change<Self::Key, Self::Service>, Self::Error>> {
let mut this = self.project();
while let Poll::Ready(Some(sid)) = this.died_rx.as_mut().poll_next(cx) {
this.services.remove(sid);
tracing::trace!(
pool.services = self.services.len(),
pool.services = this.services.len(),
message = "removing dropped service"
);
}
if self.services.len() == 0 && self.making.is_none() {
let _ = try_ready!(self.maker.poll_ready());
if this.services.len() == 0 && this.making.is_none() {
let _ = ready!(this.maker.poll_ready(cx))?;
tracing::trace!("construct initial pool connection");
self.making = Some(self.maker.make_service(self.target.clone()));
this.making
.set(Some(this.maker.make_service(this.target.clone())));
}
if let Level::High = self.load {
if self.making.is_none() {
if self
if let Level::High = this.load {
if this.making.is_none() {
if this
.limit
.map(|limit| self.services.len() >= limit)
.map(|limit| this.services.len() >= limit)
.unwrap_or(false)
{
return Ok(Async::NotReady);
return Poll::Pending;
}
tracing::trace!(
pool.services = self.services.len(),
pool.services = this.services.len(),
message = "decided to add service to loaded pool"
);
try_ready!(self.maker.poll_ready());
ready!(this.maker.poll_ready(cx))?;
tracing::trace!("making new service");
// TODO: it'd be great if we could avoid the clone here and use, say, &Target
self.making = Some(self.maker.make_service(self.target.clone()));
this.making
.set(Some(this.maker.make_service(this.target.clone())));
}
}
if let Some(mut fut) = self.making.take() {
if let Async::Ready(svc) = fut.poll()? {
let id = self.services.insert(());
let svc = DropNotifyService {
svc,
id,
notify: self.died_tx.clone(),
};
tracing::trace!(
pool.services = self.services.len(),
message = "finished creating new service"
);
self.load = Level::Normal;
return Ok(Async::Ready(Change::Insert(id, svc)));
} else {
self.making = Some(fut);
return Ok(Async::NotReady);
}
if let Some(fut) = this.making.as_mut().as_pin_mut() {
let svc = ready!(fut.poll(cx))?;
this.making.set(None);
let id = this.services.insert(());
let svc = DropNotifyService {
svc,
id,
notify: this.died_tx.clone(),
};
tracing::trace!(
pool.services = this.services.len(),
message = "finished creating new service"
);
*this.load = Level::Normal;
return Poll::Ready(Ok(Change::Insert(id, svc)));
}
match self.load {
match this.load {
Level::High => {
unreachable!("found high load but no Service being made");
}
Level::Normal => Ok(Async::NotReady),
Level::Low if self.services.len() == 1 => Ok(Async::NotReady),
Level::Normal => Poll::Pending,
Level::Low if this.services.len() == 1 => Poll::Pending,
Level::Low => {
self.load = Level::Normal;
*this.load = Level::Normal;
// NOTE: this is a little sad -- we'd prefer to kill short-living services
let rm = self.services.iter().next().unwrap().0;
let rm = this.services.iter().next().unwrap().0;
// note that we _don't_ remove from self.services here
// that'll happen automatically on drop
tracing::trace!(
pool.services = self.services.len(),
pool.services = this.services.len(),
message = "removing service for over-provisioned pool"
);
Ok(Async::Ready(Change::Remove(rm)))
Poll::Ready(Ok(Change::Remove(rm)))
}
}
}
@ -183,7 +193,7 @@ impl Builder {
/// threshold, and there are at least two services active, a service is removed.
///
/// The default value is 0.01. That is, when one in every 100 `poll_ready` calls return
/// `NotReady`, then the underlying service is considered underutilized.
/// `Pending`, then the underlying service is considered underutilized.
pub fn underutilized_below(&mut self, low: f64) -> &mut Self {
self.low = low;
self
@ -194,7 +204,7 @@ impl Builder {
/// scheduled to be added to the underlying [`Balance`].
///
/// The default value is 0.5. That is, when every other call to `poll_ready` returns
/// `NotReady`, then the underlying service is considered highly loaded.
/// `Pending`, then the underlying service is considered highly loaded.
pub fn loaded_above(&mut self, high: f64) -> &mut Self {
self.high = high;
self
@ -267,7 +277,7 @@ impl Builder {
};
Pool {
balance: Balance::from_entropy(d),
balance: Balance::from_entropy(Box::pin(d)),
options: *self,
ewma: self.init,
}
@ -282,7 +292,8 @@ where
MS::Error: Into<error::Error>,
Target: Clone,
{
balance: Balance<PoolDiscoverer<MS, Target, Request>, Request>,
// the Pin<Box<_>> here is needed since Balance requires the Service to be Unpin
balance: Balance<Pin<Box<PoolDiscoverer<MS, Target, Request>>>, Request>,
options: Builder,
ewma: f64,
}
@ -298,7 +309,7 @@ where
{
/// Construct a new dynamically sized `Pool`.
///
/// If many calls to `poll_ready` return `NotReady`, `new_service` is used to
/// If many calls to `poll_ready` return `Pending`, `new_service` is used to
/// construct another `Service` that is then added to the load-balanced pool.
/// If many calls to `poll_ready` succeed, the most recently added `Service`
/// is dropped from the pool.
@ -307,6 +318,8 @@ where
}
}
type PinBalance<S, Request> = Balance<Pin<Box<S>>, Request>;
impl<MS, Target, Req> Service<Req> for Pool<MS, Target, Req>
where
MS: MakeService<Target, Req>,
@ -316,48 +329,50 @@ where
MS::Error: Into<error::Error>,
Target: Clone,
{
type Response = <Balance<PoolDiscoverer<MS, Target, Req>, Req> as Service<Req>>::Response;
type Error = <Balance<PoolDiscoverer<MS, Target, Req>, Req> as Service<Req>>::Error;
type Future = <Balance<PoolDiscoverer<MS, Target, Req>, Req> as Service<Req>>::Future;
type Response = <PinBalance<PoolDiscoverer<MS, Target, Req>, Req> as Service<Req>>::Response;
type Error = <PinBalance<PoolDiscoverer<MS, Target, Req>, Req> as Service<Req>>::Error;
type Future = <PinBalance<PoolDiscoverer<MS, Target, Req>, Req> as Service<Req>>::Future;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
if let Async::Ready(()) = self.balance.poll_ready()? {
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if let Poll::Ready(()) = self.balance.poll_ready(cx)? {
// services was ready -- there are enough services
// update ewma with a 0 sample
self.ewma = (1.0 - self.options.alpha) * self.ewma;
let discover = self.balance.discover_mut();
let mut discover = self.balance.discover_mut().as_mut();
let discover = discover.project();
if self.ewma < self.options.low {
if discover.load != Level::Low {
if *discover.load != Level::Low {
tracing::trace!({ ewma = %self.ewma }, "pool is over-provisioned");
}
discover.load = Level::Low;
*discover.load = Level::Low;
if discover.services.len() > 1 {
// reset EWMA so we don't immediately try to remove another service
self.ewma = self.options.init;
}
} else {
if discover.load != Level::Normal {
if *discover.load != Level::Normal {
tracing::trace!({ ewma = %self.ewma }, "pool is appropriately provisioned");
}
discover.load = Level::Normal;
*discover.load = Level::Normal;
}
return Ok(Async::Ready(()));
return Poll::Ready(Ok(()));
}
let discover = self.balance.discover_mut();
let mut discover = self.balance.discover_mut().as_mut();
let discover = discover.project();
if discover.making.is_none() {
// no services are ready -- we're overloaded
// update ewma with a 1 sample
self.ewma = self.options.alpha + (1.0 - self.options.alpha) * self.ewma;
if self.ewma > self.options.high {
if discover.load != Level::High {
if *discover.load != Level::High {
tracing::trace!({ ewma = %self.ewma }, "pool is under-provisioned");
}
discover.load = Level::High;
*discover.load = Level::High;
// don't reset the EWMA -- in theory, poll_ready should now start returning
// `Ready`, so we won't try to launch another service immediately.
@ -366,13 +381,13 @@ where
// we need to call balance again for PoolDiscover to realize
// it can make a new service
return self.balance.poll_ready();
return self.balance.poll_ready(cx);
} else {
discover.load = Level::Normal;
*discover.load = Level::Normal;
}
}
Ok(Async::NotReady)
Poll::Pending
}
fn call(&mut self, req: Req) -> Self::Future {
@ -405,8 +420,8 @@ impl<Request, Svc: Service<Request>> Service<Request> for DropNotifyService<Svc>
type Future = Svc::Future;
type Error = Svc::Error;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.svc.poll_ready()
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.svc.poll_ready(cx)
}
fn call(&mut self, req: Request) -> Self::Future {

View File

@ -1,285 +1,200 @@
use futures::{Async, Future};
use futures_util::pin_mut;
use tokio_test::{assert_pending, assert_ready, assert_ready_ok, task};
use tower_load as load;
use tower_service::Service;
use tower_test::mock;
use tower_test::{assert_request_eq, mock};
use super::*;
macro_rules! assert_fut_ready {
($fut:expr, $val:expr) => {{
assert_fut_ready!($fut, $val, "must be ready");
}};
($fut:expr, $val:expr, $msg:expr) => {{
assert_eq!(
$fut.poll().expect("must not fail"),
Async::Ready($val),
$msg
);
}};
}
macro_rules! assert_ready {
($svc:expr) => {{
assert_ready!($svc, "must be ready");
}};
($svc:expr, $msg:expr) => {{
assert!($svc.poll_ready().expect("must not fail").is_ready(), $msg);
}};
}
macro_rules! assert_fut_not_ready {
($fut:expr) => {{
assert_fut_not_ready!($fut, "must not be ready");
}};
($fut:expr, $msg:expr) => {{
assert!(!$fut.poll().expect("must not fail").is_ready(), $msg);
}};
}
macro_rules! assert_not_ready {
($svc:expr) => {{
assert_not_ready!($svc, "must not be ready");
}};
($svc:expr, $msg:expr) => {{
assert!(!$svc.poll_ready().expect("must not fail").is_ready(), $msg);
}};
}
#[test]
fn basic() {
// start the pool
let (mock, mut handle) =
mock::pair::<(), load::Constant<mock::Mock<(), &'static str>, usize>>();
let mut pool = Builder::new().build(mock, ());
with_task(|| {
assert_not_ready!(pool);
});
task::mock(|cx| {
// start the pool
let (mock, handle) =
mock::pair::<(), load::Constant<mock::Mock<(), &'static str>, usize>>();
pin_mut!(handle);
// give the pool a backing service
let (svc1_m, mut svc1) = mock::pair();
handle
.next_request()
.unwrap()
.1
.send_response(load::Constant::new(svc1_m, 0));
with_task(|| {
assert_ready!(pool);
});
let mut pool = Builder::new().build(mock, ());
assert_pending!(pool.poll_ready(cx));
// send a request to the one backing service
let mut fut = pool.call(());
with_task(|| {
assert_fut_not_ready!(fut);
});
svc1.next_request().unwrap().1.send_response("foobar");
with_task(|| {
assert_fut_ready!(fut, "foobar");
// give the pool a backing service
let (svc1_m, svc1) = mock::pair();
pin_mut!(svc1);
assert_request_eq!(handle, ()).send_response(load::Constant::new(svc1_m, 0));
assert_ready_ok!(pool.poll_ready(cx));
// send a request to the one backing service
let fut = pool.call(());
pin_mut!(fut);
assert_pending!(fut.as_mut().poll(cx));
assert_request_eq!(svc1, ()).send_response("foobar");
assert_eq!(assert_ready_ok!(fut.poll(cx)), "foobar");
});
}
#[test]
fn high_load() {
// start the pool
let (mock, mut handle) =
mock::pair::<(), load::Constant<mock::Mock<(), &'static str>, usize>>();
let mut pool = Builder::new()
.urgency(1.0) // so _any_ NotReady will add a service
.underutilized_below(0.0) // so no Ready will remove a service
.max_services(Some(2))
.build(mock, ());
with_task(|| {
assert_not_ready!(pool);
});
task::mock(|cx| {
// start the pool
let (mock, handle) =
mock::pair::<(), load::Constant<mock::Mock<(), &'static str>, usize>>();
pin_mut!(handle);
// give the pool a backing service
let (svc1_m, mut svc1) = mock::pair();
svc1.allow(1);
handle
.next_request()
.unwrap()
.1
.send_response(load::Constant::new(svc1_m, 0));
with_task(|| {
assert_ready!(pool);
});
let mut pool = Builder::new()
.urgency(1.0) // so _any_ Pending will add a service
.underutilized_below(0.0) // so no Ready will remove a service
.max_services(Some(2))
.build(mock, ());
assert_pending!(pool.poll_ready(cx));
// make the one backing service not ready
let mut fut1 = pool.call(());
// give the pool a backing service
let (svc1_m, svc1) = mock::pair();
pin_mut!(svc1);
// if we poll_ready again, pool should notice that load is increasing
// since urgency == 1.0, it should immediately enter high load
with_task(|| {
assert_not_ready!(pool);
});
// it should ask the maker for another service, so we give it one
let (svc2_m, mut svc2) = mock::pair();
svc2.allow(1);
handle
.next_request()
.unwrap()
.1
.send_response(load::Constant::new(svc2_m, 0));
svc1.allow(1);
assert_request_eq!(handle, ()).send_response(load::Constant::new(svc1_m, 0));
assert_ready_ok!(pool.poll_ready(cx));
// the pool should now be ready again for one more request
with_task(|| {
assert_ready!(pool);
});
let mut fut2 = pool.call(());
with_task(|| {
assert_not_ready!(pool);
});
// make the one backing service not ready
let fut1 = pool.call(());
pin_mut!(fut1);
// the pool should _not_ try to add another service
// sicen we have max_services(2)
with_task(|| {
assert!(!handle.poll_request().unwrap().is_ready());
});
// if we poll_ready again, pool should notice that load is increasing
// since urgency == 1.0, it should immediately enter high load
assert_pending!(pool.poll_ready(cx));
// it should ask the maker for another service, so we give it one
let (svc2_m, svc2) = mock::pair();
pin_mut!(svc2);
// let see that each service got one request
svc1.next_request().unwrap().1.send_response("foo");
svc2.next_request().unwrap().1.send_response("bar");
with_task(|| {
assert_fut_ready!(fut1, "foo");
});
with_task(|| {
assert_fut_ready!(fut2, "bar");
svc2.allow(1);
assert_request_eq!(handle, ()).send_response(load::Constant::new(svc2_m, 0));
// the pool should now be ready again for one more request
assert_ready_ok!(pool.poll_ready(cx));
let fut2 = pool.call(());
pin_mut!(fut2);
assert_pending!(pool.poll_ready(cx));
// the pool should _not_ try to add another service
// sicen we have max_services(2)
assert_pending!(handle.as_mut().poll_request(cx));
// let see that each service got one request
assert_request_eq!(svc1, ()).send_response("foo");
assert_request_eq!(svc2, ()).send_response("bar");
assert_eq!(assert_ready_ok!(fut1.poll(cx)), "foo");
assert_eq!(assert_ready_ok!(fut2.poll(cx)), "bar");
});
}
#[test]
fn low_load() {
// start the pool
let (mock, mut handle) =
mock::pair::<(), load::Constant<mock::Mock<(), &'static str>, usize>>();
let mut pool = Builder::new()
.urgency(1.0) // so any event will change the service count
.build(mock, ());
with_task(|| {
assert_not_ready!(pool);
});
task::mock(|cx| {
// start the pool
let (mock, handle) =
mock::pair::<(), load::Constant<mock::Mock<(), &'static str>, usize>>();
pin_mut!(handle);
// give the pool a backing service
let (svc1_m, mut svc1) = mock::pair();
svc1.allow(1);
handle
.next_request()
.unwrap()
.1
.send_response(load::Constant::new(svc1_m, 0));
with_task(|| {
assert_ready!(pool);
});
let mut pool = Builder::new()
.urgency(1.0) // so any event will change the service count
.build(mock, ());
assert_pending!(pool.poll_ready(cx));
// cycling a request should now work
let mut fut = pool.call(());
svc1.next_request().unwrap().1.send_response("foo");
with_task(|| {
assert_fut_ready!(fut, "foo");
});
// and pool should now not be ready (since svc1 isn't ready)
// it should immediately try to add another service
// which we give it
with_task(|| {
assert_not_ready!(pool);
});
let (svc2_m, mut svc2) = mock::pair();
svc2.allow(1);
handle
.next_request()
.unwrap()
.1
.send_response(load::Constant::new(svc2_m, 0));
// pool is now ready
// which (because of urgency == 1.0) should immediately cause it to drop a service
// it'll drop svc1, so it'll still be ready
with_task(|| {
assert_ready!(pool);
});
// and even with another ready, it won't drop svc2 since its now the only service
with_task(|| {
assert_ready!(pool);
});
// give the pool a backing service
let (svc1_m, svc1) = mock::pair();
pin_mut!(svc1);
// cycling a request should now work on svc2
let mut fut = pool.call(());
svc2.next_request().unwrap().1.send_response("foo");
with_task(|| {
assert_fut_ready!(fut, "foo");
});
svc1.allow(1);
assert_request_eq!(handle, ()).send_response(load::Constant::new(svc1_m, 0));
assert_ready_ok!(pool.poll_ready(cx));
// and again (still svc2)
svc2.allow(1);
with_task(|| {
assert_ready!(pool);
});
let mut fut = pool.call(());
svc2.next_request().unwrap().1.send_response("foo");
with_task(|| {
assert_fut_ready!(fut, "foo");
// cycling a request should now work
let fut = pool.call(());
pin_mut!(fut);
assert_request_eq!(svc1, ()).send_response("foo");
assert_eq!(assert_ready_ok!(fut.poll(cx)), "foo");
// and pool should now not be ready (since svc1 isn't ready)
// it should immediately try to add another service
// which we give it
assert_pending!(pool.poll_ready(cx));
let (svc2_m, svc2) = mock::pair();
pin_mut!(svc2);
svc2.allow(1);
assert_request_eq!(handle, ()).send_response(load::Constant::new(svc2_m, 0));
// pool is now ready
// which (because of urgency == 1.0) should immediately cause it to drop a service
// it'll drop svc1, so it'll still be ready
assert_ready_ok!(pool.poll_ready(cx));
// and even with another ready, it won't drop svc2 since its now the only service
assert_ready_ok!(pool.poll_ready(cx));
// cycling a request should now work on svc2
let fut = pool.call(());
pin_mut!(fut);
assert_request_eq!(svc2, ()).send_response("foo");
assert_eq!(assert_ready_ok!(fut.poll(cx)), "foo");
// and again (still svc2)
svc2.allow(1);
assert_ready_ok!(pool.poll_ready(cx));
let fut = pool.call(());
pin_mut!(fut);
assert_request_eq!(svc2, ()).send_response("foo");
assert_eq!(assert_ready_ok!(fut.poll(cx)), "foo");
});
}
#[test]
fn failing_service() {
// start the pool
let (mock, mut handle) =
mock::pair::<(), load::Constant<mock::Mock<(), &'static str>, usize>>();
let mut pool = Builder::new()
.urgency(1.0) // so _any_ NotReady will add a service
.underutilized_below(0.0) // so no Ready will remove a service
.build(mock, ());
with_task(|| {
assert_not_ready!(pool);
});
task::mock(|cx| {
// start the pool
let (mock, handle) =
mock::pair::<(), load::Constant<mock::Mock<(), &'static str>, usize>>();
pin_mut!(handle);
// give the pool a backing service
let (svc1_m, mut svc1) = mock::pair();
svc1.allow(1);
handle
.next_request()
.unwrap()
.1
.send_response(load::Constant::new(svc1_m, 0));
with_task(|| {
assert_ready!(pool);
});
let mut pool = Builder::new()
.urgency(1.0) // so _any_ Pending will add a service
.underutilized_below(0.0) // so no Ready will remove a service
.build(mock, ());
assert_pending!(pool.poll_ready(cx));
// one request-response cycle
let mut fut = pool.call(());
svc1.next_request().unwrap().1.send_response("foo");
with_task(|| {
assert_fut_ready!(fut, "foo");
});
// give the pool a backing service
let (svc1_m, svc1) = mock::pair();
pin_mut!(svc1);
// now make svc1 fail, so it has to be removed
svc1.send_error("ouch");
// polling now should recognize the failed service,
// try to create a new one, and then realize the maker isn't ready
with_task(|| {
assert_not_ready!(pool);
});
// then we release another service
let (svc2_m, mut svc2) = mock::pair();
svc2.allow(1);
handle
.next_request()
.unwrap()
.1
.send_response(load::Constant::new(svc2_m, 0));
svc1.allow(1);
assert_request_eq!(handle, ()).send_response(load::Constant::new(svc1_m, 0));
assert_ready_ok!(pool.poll_ready(cx));
// the pool should now be ready again
with_task(|| {
assert_ready!(pool);
});
// and a cycle should work (and go through svc2)
let mut fut = pool.call(());
svc2.next_request().unwrap().1.send_response("bar");
with_task(|| {
assert_fut_ready!(fut, "bar");
// one request-response cycle
let fut = pool.call(());
pin_mut!(fut);
assert_request_eq!(svc1, ()).send_response("foo");
assert_eq!(assert_ready_ok!(fut.poll(cx)), "foo");
// now make svc1 fail, so it has to be removed
svc1.send_error("ouch");
// polling now should recognize the failed service,
// try to create a new one, and then realize the maker isn't ready
assert_pending!(pool.poll_ready(cx));
// then we release another service
let (svc2_m, svc2) = mock::pair();
pin_mut!(svc2);
svc2.allow(1);
assert_request_eq!(handle, ()).send_response(load::Constant::new(svc2_m, 0));
// the pool should now be ready again
assert_ready_ok!(pool.poll_ready(cx));
// and a cycle should work (and go through svc2)
let fut = pool.call(());
pin_mut!(fut);
assert_request_eq!(svc2, ()).send_response("bar");
assert_eq!(assert_ready_ok!(fut.poll(cx)), "bar");
});
}
fn with_task<F: FnOnce() -> U, U>(f: F) -> U {
use futures::future::lazy;
lazy(|| Ok::<_, ()>(f())).wait().unwrap()
}

View File

@ -16,6 +16,7 @@ mod stream;
pub use crate::{list::ServiceList, stream::ServiceStream};
use std::hash::Hash;
use std::ops;
use std::{
pin::Pin,
task::{Context, Poll},
@ -43,6 +44,49 @@ pub trait Discover {
) -> Poll<Result<Change<Self::Key, Self::Service>, Self::Error>>;
}
// delegate through Pin
impl<P> Discover for Pin<P>
where
P: Unpin + ops::DerefMut,
P::Target: Discover,
{
type Key = <<P as ops::Deref>::Target as Discover>::Key;
type Service = <<P as ops::Deref>::Target as Discover>::Service;
type Error = <<P as ops::Deref>::Target as Discover>::Error;
fn poll_discover(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Change<Self::Key, Self::Service>, Self::Error>> {
Pin::get_mut(self).as_mut().poll_discover(cx)
}
}
impl<D: ?Sized + Discover + Unpin> Discover for &mut D {
type Key = D::Key;
type Service = D::Service;
type Error = D::Error;
fn poll_discover(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Change<Self::Key, Self::Service>, Self::Error>> {
Discover::poll_discover(Pin::new(&mut **self), cx)
}
}
impl<D: ?Sized + Discover + Unpin> Discover for Box<D> {
type Key = D::Key;
type Service = D::Service;
type Error = D::Error;
fn poll_discover(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Change<Self::Key, Self::Service>, Self::Error>> {
D::poll_discover(Pin::new(&mut *self), cx)
}
}
/// A change in the service set
pub enum Change<K, V> {
Insert(K, V),

View File

@ -13,6 +13,7 @@ use crate::Load;
/// Wraps a type so that `Load::load` returns a constant value.
#[pin_project]
#[derive(Debug)]
pub struct Constant<T, M> {
inner: T,
load: M,