From 87976ae4184a988f8631f38088fa201710faffed Mon Sep 17 00:00:00 2001 From: Jon Gjengset Date: Tue, 10 Sep 2019 18:15:32 -0400 Subject: [PATCH] Update tower-balance to std::future (#335) This bumps tower-balance to 0.3.0-alpha.1 It also adds delegate impls for `Discover` through `Pin`, and makes `tower-load::Constant: Debug`. --- Cargo.toml | 2 +- tower-balance/Cargo.toml | 38 +-- tower-balance/examples/demo.rs | 101 ++++---- tower-balance/src/lib.rs | 2 +- tower-balance/src/p2c/make.rs | 34 ++- tower-balance/src/p2c/service.rs | 105 +++++--- tower-balance/src/p2c/test.rs | 120 +++++---- tower-balance/src/pool/mod.rs | 157 ++++++------ tower-balance/src/pool/test.rs | 405 ++++++++++++------------------- tower-discover/src/lib.rs | 44 ++++ tower-load/src/constant.rs | 1 + 11 files changed, 507 insertions(+), 502 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a0fcd882..b00d8286 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ members = [ "tower", - # "tower-balance", + "tower-balance", "tower-buffer", "tower-discover", "tower-filter", diff --git a/tower-balance/Cargo.toml b/tower-balance/Cargo.toml index 4db8f662..0650905d 100644 --- a/tower-balance/Cargo.toml +++ b/tower-balance/Cargo.toml @@ -8,13 +8,13 @@ name = "tower-balance" # - README.md # - Update CHANGELOG.md. # - Create "v0.1.x" git tag. -version = "0.1.0" +version = "0.3.0-alpha.1" authors = ["Tower Maintainers "] license = "MIT" readme = "README.md" repository = "https://github.com/tower-rs/tower" homepage = "https://github.com/tower-rs/tower" -documentation = "https://docs.rs/tower-balance/0.1.0" +documentation = "https://docs.rs/tower-balance/0.3.0-alpha.1" description = """ Balance load across a set of uniform services. """ @@ -27,26 +27,30 @@ log = ["tracing/log"] default = ["log"] [dependencies] -futures = "0.1.26" +futures-util-preview = "0.3.0-alpha.18" +futures-core-preview = "0.3.0-alpha.18" +pin-project = "0.4.0-alpha.10" indexmap = "1.0.2" tracing = "0.1" rand = "0.6.5" -tokio-sync = "0.1.3" -tokio-timer = "0.2.4" -tower-discover = "0.1.0" -tower-layer = "0.1.0" -tower-load = { version = "0.1.0", path = "../tower-load" } -tower-service = "0.2.0" -tower-util = "0.1.0" +tokio-sync = "0.2.0-alpha.4" +tokio-timer = "0.3.0-alpha.4" +tower-discover = { version = "0.3.0-alpha.1", path = "../tower-discover" } +tower-layer = { version = "0.3.0-alpha.1", path = "../tower-layer" } +tower-load = { version = "0.3.0-alpha.1", path = "../tower-load" } +tower-service = "0.3.0-alpha.1" +tower-make = { version = "0.3.0-alpha.1", path = "../tower-make" } +tower-util = { version = "0.3.0-alpha.1", path = "../tower-util" } slab = "0.4" [dev-dependencies] -tracing-fmt = { git = "https://github.com/tokio-rs/tracing.git" } +tracing-subscriber = "0.1.1" hdrhistogram = "6.0" quickcheck = { version = "0.6", default-features = false } -tokio = "0.1.7" -tokio-executor = "0.1.2" -tower = { version = "*", path = "../tower" } -tower-buffer = { version = "*", path = "../tower-buffer" } -tower-limit = { version = "*", path = "../tower-limit" } -tower-test = { version = "*", path = "../tower-test" } +tokio = "0.2.0-alpha.4" +tokio-executor = "0.2.0-alpha.4" +tokio-test = "0.2.0-alpha.4" +tower-buffer = { version = "0.3.0-alpha.1", path = "../tower-buffer" } +tower-limit = { version = "0.3.0-alpha.1", path = "../tower-limit" } +tower-test = { version = "0.3.0-alpha.1", path = "../tower-test" } +tower = { version = "0.3.0-alpha.1", path = "../tower" } diff --git a/tower-balance/examples/demo.rs b/tower-balance/examples/demo.rs index 8d989d9f..399e53bb 100644 --- a/tower-balance/examples/demo.rs +++ b/tower-balance/examples/demo.rs @@ -1,17 +1,22 @@ //! Exercises load balancers with mocked services. -use futures::{future, stream, Async, Future, Poll, Stream}; +use futures_core::TryStream; +use futures_util::{stream, stream::StreamExt, try_stream::TryStreamExt}; use hdrhistogram::Histogram; +use pin_project::pin_project; use rand::{self, Rng}; use std::time::{Duration, Instant}; -use tokio::{runtime, timer}; -use tower::{ - discover::{Change, Discover}, - limit::concurrency::ConcurrencyLimit, - Service, ServiceExt, +use std::{ + pin::Pin, + task::{Context, Poll}, }; +use tokio::timer; +use tower::util::ServiceExt; use tower_balance as lb; +use tower_discover::{Change, Discover}; +use tower_limit::concurrency::ConcurrencyLimit; use tower_load as load; +use tower_service::Service; const REQUESTS: usize = 100_000; const CONCURRENCY: usize = 500; @@ -36,8 +41,9 @@ struct Summary { count_by_instance: [usize; 10], } -fn main() { - tracing::subscriber::set_global_default(tracing_fmt::FmtSubscriber::default()).unwrap(); +#[tokio::main] +async fn main() { + tracing::subscriber::set_global_default(tracing_subscriber::FmtSubscriber::default()).unwrap(); println!("REQUESTS={}", REQUESTS); println!("CONCURRENCY={}", CONCURRENCY); @@ -49,37 +55,27 @@ fn main() { } println!("]"); - let mut rt = runtime::Runtime::new().unwrap(); + let decay = Duration::from_secs(10); + let d = gen_disco(); + let pe = lb::p2c::Balance::from_entropy(load::PeakEwmaDiscover::new( + d, + DEFAULT_RTT, + decay, + load::NoInstrument, + )); + run("P2C+PeakEWMA...", pe).await; - let fut = future::lazy(move || { - let decay = Duration::from_secs(10); - let d = gen_disco(); - let pe = lb::p2c::Balance::from_entropy(load::PeakEwmaDiscover::new( - d, - DEFAULT_RTT, - decay, - load::NoInstrument, - )); - run("P2C+PeakEWMA...", pe) - }); - - let fut = fut.then(move |_| { - let d = gen_disco(); - let ll = lb::p2c::Balance::from_entropy(load::PendingRequestsDiscover::new( - d, - load::NoInstrument, - )); - run("P2C+LeastLoaded...", ll) - }); - - rt.spawn(fut); - rt.shutdown_on_idle().wait().unwrap(); + let d = gen_disco(); + let ll = + lb::p2c::Balance::from_entropy(load::PendingRequestsDiscover::new(d, load::NoInstrument)); + run("P2C+LeastLoaded...", ll).await; } type Error = Box; type Key = usize; +#[pin_project] struct Disco(Vec<(Key, S)>); impl Discover for Disco @@ -89,10 +85,13 @@ where type Key = Key; type Service = S; type Error = Error; - fn poll(&mut self) -> Poll, Self::Error> { - match self.0.pop() { - Some((k, service)) => Ok(Change::Insert(k, service).into()), - None => Ok(Async::NotReady), + fn poll_discover( + mut self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll, Self::Error>> { + match self.project().0.pop() { + Some((k, service)) => Poll::Ready(Ok(Change::Insert(k, service))), + None => Poll::Pending, } } } @@ -116,12 +115,11 @@ fn gen_disco() -> impl Discover< .saturating_add(latency.as_secs().saturating_mul(1_000)); let latency = Duration::from_millis(rand::thread_rng().gen_range(0, maxms)); - timer::Delay::new(start + latency) - .map_err(Into::into) - .map(move |_| { - let latency = start.elapsed(); - Rsp { latency, instance } - }) + async move { + timer::delay(start + latency).await; + let latency = start.elapsed(); + Ok(Rsp { latency, instance }) + } }); (instance, ConcurrencyLimit::new(svc, ENDPOINT_CAPACITY)) @@ -130,9 +128,9 @@ fn gen_disco() -> impl Discover< ) } -fn run(name: &'static str, lb: lb::p2c::Balance) -> impl Future +async fn run(name: &'static str, lb: lb::p2c::Balance) where - D: Discover + Send + 'static, + D: Discover + Unpin + Send + 'static, D::Error: Into, D::Key: Clone + Send, D::Service: Service + load::Load + Send, @@ -142,21 +140,22 @@ where { println!("{}", name); - let requests = stream::repeat::<_, Error>(Req).take(REQUESTS as u64); + let requests = stream::repeat(Req).take(REQUESTS as u64); let service = ConcurrencyLimit::new(lb, CONCURRENCY); let responses = service.call_all(requests).unordered(); - compute_histo(responses).map(|s| s.report()).map_err(|_| {}) + compute_histo(responses).await.unwrap().report(); } -fn compute_histo(times: S) -> impl Future + 'static +async fn compute_histo(mut times: S) -> Result where - S: Stream + 'static, + S: TryStream + 'static + Unpin, { - times.fold(Summary::new(), |mut summary, rsp| { + let mut summary = Summary::new(); + while let Some(rsp) = times.try_next().await? { summary.count(rsp); - Ok(summary) as Result<_, Error> - }) + } + Ok(summary) } impl Summary { diff --git a/tower-balance/src/lib.rs b/tower-balance/src/lib.rs index 131004d7..cc4c81e6 100644 --- a/tower-balance/src/lib.rs +++ b/tower-balance/src/lib.rs @@ -1,6 +1,6 @@ //! Load balancing middlewares. -#![doc(html_root_url = "https://docs.rs/tower-balance/0.1.0")] +#![doc(html_root_url = "https://docs.rs/tower-balance/0.3.0-alpha.1")] #![deny(missing_docs)] #![deny(rust_2018_idioms)] #![allow(elided_lifetimes_in_paths)] diff --git a/tower-balance/src/p2c/make.rs b/tower-balance/src/p2c/make.rs index dd3c5beb..85386f8f 100644 --- a/tower-balance/src/p2c/make.rs +++ b/tower-balance/src/p2c/make.rs @@ -1,7 +1,13 @@ use super::Balance; -use futures::{try_ready, Future, Poll}; +use futures_core::ready; +use pin_project::pin_project; use rand::{rngs::SmallRng, FromEntropy}; use std::marker::PhantomData; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; use tower_discover::Discover; use tower_service::Service; @@ -13,8 +19,10 @@ pub struct BalanceMake { _marker: PhantomData, } +#[pin_project] /// Makes a balancer instance. pub struct MakeFuture { + #[pin] inner: F, rng: SmallRng, _marker: PhantomData, @@ -45,8 +53,8 @@ where type Error = S::Error; type Future = MakeFuture; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.inner.poll_ready() + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) } fn call(&mut self, target: Target) -> Self::Future { @@ -58,18 +66,18 @@ where } } -impl Future for MakeFuture +impl Future for MakeFuture where - F: Future, - F::Item: Discover, - ::Service: Service, + F: Future>, + T: Discover, + ::Service: Service, { - type Item = Balance; - type Error = F::Error; + type Output = Result, E>; - fn poll(&mut self) -> Poll { - let inner = try_ready!(self.inner.poll()); - let svc = Balance::new(inner, self.rng.clone()); - Ok(svc.into()) + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let inner = ready!(this.inner.poll(cx))?; + let svc = Balance::new(inner, this.rng.clone()); + Poll::Ready(Ok(svc)) } } diff --git a/tower-balance/src/p2c/service.rs b/tower-balance/src/p2c/service.rs index b9a99378..ab84d119 100644 --- a/tower-balance/src/p2c/service.rs +++ b/tower-balance/src/p2c/service.rs @@ -1,7 +1,14 @@ use crate::error; -use futures::{future, stream, try_ready, Async, Future, Poll, Stream}; +use futures_core::{ready, Stream}; +use futures_util::{stream, try_future, try_future::TryFutureExt}; use indexmap::IndexMap; +use pin_project::pin_project; use rand::{rngs::SmallRng, FromEntropy}; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; use tokio_sync::oneshot; use tower_discover::{Change, Discover}; use tower_load::Load; @@ -20,8 +27,15 @@ use tracing::{debug, trace}; /// > The maximum load variance between any two servers is bound by `ln(ln(n))` where /// > `n` is the number of servers in the cluster. /// +/// Note that `Balance` requires that the `Discover` you use is `Unpin` in order to implement +/// `Service`. This is because it needs to be accessed from `Service::poll_ready`, which takes +/// `&mut self`. You can achieve this easily by wrapping your `Discover` in [`Box::pin`] before you +/// construct the `Balance` instance. For more details, see [#319]. +/// /// [finagle]: https://twitter.github.io/finagle/guide/Clients.html#power-of-two-choices-p2c-least-loaded /// [p2c]: http://www.eecs.harvard.edu/~michaelm/postscripts/handbook2001.pdf +/// [`Box::pin`]: https://doc.rust-lang.org/std/boxed/struct.Box.html#method.pin +/// [#319]: https://github.com/tower-rs/tower/issues/319 #[derive(Debug)] pub struct Balance { discover: D, @@ -38,13 +52,16 @@ pub struct Balance { rng: SmallRng, } +#[pin_project] /// A Future that becomes satisfied when an `S`-typed service is ready. /// /// May fail due to cancelation, i.e. if the service is removed from discovery. #[derive(Debug)] struct UnreadyService { key: Option, + #[pin] cancel: oneshot::Receiver<()>, + #[pin] ready: tower_util::Ready, } @@ -88,7 +105,7 @@ where impl Balance where - D: Discover, + D: Discover + Unpin, D::Key: Clone, D::Error: Into, D::Service: Service + Load, @@ -98,10 +115,12 @@ where /// Polls `discover` for updates, adding new items to `not_ready`. /// /// Removals may alter the order of either `ready` or `not_ready`. - fn poll_discover(&mut self) -> Poll<(), error::Discover> { + fn poll_discover(&mut self, cx: &mut Context<'_>) -> Poll> { debug!("updating from discover"); loop { - match try_ready!(self.discover.poll().map_err(|e| error::Discover(e.into()))) { + match ready!(Pin::new(&mut self.discover).poll_discover(cx)) + .map_err(|e| error::Discover(e.into()))? + { Change::Remove(key) => { trace!("remove"); self.evict(&key) @@ -132,23 +151,25 @@ where .next_ready_index .and_then(|i| Self::repair_index(i, idx, self.ready_services.len())); debug_assert!(!self.cancelations.contains_key(key)); - } else if let Some(cancel) = self.cancelations.remove(key) { + } else if let Some(cancel) = self.cancelations.swap_remove(key) { let _ = cancel.send(()); } } - fn poll_unready(&mut self) { + fn poll_unready(&mut self, cx: &mut Context<'_>) { loop { - match self.unready_services.poll() { - Ok(Async::NotReady) | Ok(Async::Ready(None)) => return, - Ok(Async::Ready(Some((key, svc)))) => { + match Pin::new(&mut self.unready_services).poll_next(cx) { + Poll::Pending | Poll::Ready(None) => return, + Poll::Ready(Some(Ok((key, svc)))) => { trace!("endpoint ready"); - let _cancel = self.cancelations.remove(&key); + let _cancel = self.cancelations.swap_remove(&key); debug_assert!(_cancel.is_some(), "missing cancelation"); self.ready_services.insert(key, svc); } - Err((key, Error::Canceled)) => debug_assert!(!self.cancelations.contains_key(&key)), - Err((key, Error::Inner(e))) => { + Poll::Ready(Some(Err((key, Error::Canceled)))) => { + debug_assert!(!self.cancelations.contains_key(&key)) + } + Poll::Ready(Some(Err((key, Error::Inner(e))))) => { let error = e.into(); debug!({ %error }, "dropping failed endpoint"); let _cancel = self.cancelations.swap_remove(&key); @@ -207,31 +228,35 @@ where svc.load() } - fn poll_ready_index_or_evict(&mut self, index: usize) -> Poll<(), ()> { + fn poll_ready_index_or_evict( + &mut self, + cx: &mut Context<'_>, + index: usize, + ) -> Poll> { let (_, svc) = self .ready_services .get_index_mut(index) .expect("invalid index"); - match svc.poll_ready() { - Ok(Async::Ready(())) => Ok(Async::Ready(())), - Ok(Async::NotReady) => { + match svc.poll_ready(cx) { + Poll::Ready(Ok(())) => Poll::Ready(Ok(())), + Poll::Pending => { // became unready; so move it back there. let (key, svc) = self .ready_services .swap_remove_index(index) .expect("invalid ready index"); self.push_unready(key, svc); - Ok(Async::NotReady) + Poll::Pending } - Err(e) => { + Poll::Ready(Err(e)) => { // failed, so drop it. let error = e.into(); debug!({ %error }, "evicting failed endpoint"); self.ready_services .swap_remove_index(index) .expect("invalid ready index"); - Err(()) + Poll::Ready(Err(())) } } } @@ -239,7 +264,7 @@ where impl Service for Balance where - D: Discover, + D: Discover + Unpin, D::Key: Clone, D::Error: Into, D::Service: Service + Load, @@ -248,7 +273,7 @@ where { type Response = >::Response; type Error = error::Error; - type Future = future::MapErr< + type Future = try_future::MapErr< >::Future, fn(>::Error) -> error::Error, >; @@ -257,13 +282,13 @@ where /// /// When `Async::Ready` is returned, `ready_index` is set with a valid index /// into `ready` referring to a `Service` that is ready to disptach a request. - fn poll_ready(&mut self) -> Poll<(), Self::Error> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { // First and foremost, process discovery updates. This removes or updates a // previously-selected `ready_index` if appropriate. - self.poll_discover()?; + let _ = self.poll_discover(cx)?; // Drive new or busy services to readiness. - self.poll_unready(); + self.poll_unready(cx); trace!({ nready = self.ready_services.len(), nunready = self.unready_services.len() }, "poll_ready"); loop { @@ -276,8 +301,8 @@ where trace!({ next.idx = index }, "preselected ready_index"); debug_assert!(index < self.ready_services.len()); - if let Ok(Async::Ready(())) = self.poll_ready_index_or_evict(index) { - return Ok(Async::Ready(())); + if let Poll::Ready(Ok(())) = self.poll_ready_index_or_evict(cx, index) { + return Poll::Ready(Ok(())); } self.next_ready_index = None; @@ -286,7 +311,7 @@ where self.next_ready_index = self.p2c_next_ready_index(); if self.next_ready_index.is_none() { debug_assert!(self.ready_services.is_empty()); - return Ok(Async::NotReady); + return Poll::Pending; } } } @@ -307,24 +332,24 @@ where } impl, Req> Future for UnreadyService { - type Item = (K, S); - type Error = (K, Error); + type Output = Result<(K, S), (K, Error)>; - fn poll(&mut self) -> Poll { - if let Ok(Async::Ready(())) = self.cancel.poll() { - let key = self.key.take().expect("polled after ready"); - return Err((key, Error::Canceled)); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + + if let Poll::Ready(Ok(())) = this.cancel.poll(cx) { + let key = this.key.take().expect("polled after ready"); + return Poll::Ready(Err((key, Error::Canceled))); } - match self.ready.poll() { - Ok(Async::NotReady) => Ok(Async::NotReady), - Ok(Async::Ready(svc)) => { - let key = self.key.take().expect("polled after ready"); - Ok((key, svc).into()) + match ready!(this.ready.poll(cx)) { + Ok(svc) => { + let key = this.key.take().expect("polled after ready"); + Poll::Ready(Ok((key, svc))) } Err(e) => { - let key = self.key.take().expect("polled after ready"); - Err((key, Error::Inner(e))) + let key = this.key.take().expect("polled after ready"); + Poll::Ready(Err((key, Error::Inner(e)))) } } } diff --git a/tower-balance/src/p2c/test.rs b/tower-balance/src/p2c/test.rs index 0c4e9dab..a5659575 100644 --- a/tower-balance/src/p2c/test.rs +++ b/tower-balance/src/p2c/test.rs @@ -1,128 +1,122 @@ -use futures::{Async, Future}; +use futures_util::pin_mut; +use std::{future::Future, task::Poll}; +use tokio_test::{assert_pending, assert_ready, assert_ready_ok, task}; use tower_discover::ServiceList; use tower_load as load; use tower_service::Service; -use tower_test::mock; +use tower_test::{assert_request_eq, mock}; use super::*; -macro_rules! assert_ready { - ($svc:expr) => {{ - assert_ready!($svc, "must be ready"); - }}; - ($svc:expr, $msg:expr) => {{ - assert!($svc.poll_ready().expect("must not fail").is_ready(), $msg); - }}; -} - -macro_rules! assert_not_ready { - ($svc:expr) => {{ - assert_not_ready!($svc, "must not be ready"); - }}; - ($svc:expr, $msg:expr) => {{ - assert!(!$svc.poll_ready().expect("must not fail").is_ready(), $msg); - }}; -} - #[test] fn empty() { - let empty: Vec, usize>> = vec![]; - let disco = ServiceList::new(empty); - let mut svc = Balance::from_entropy(disco); - with_task(|| { - assert_not_ready!(svc); - }) + task::mock(|cx| { + let empty: Vec, usize>> = vec![]; + let disco = ServiceList::new(empty); + let mut svc = Balance::from_entropy(disco); + assert_pending!(svc.poll_ready(cx)); + }); } #[test] fn single_endpoint() { - let (mock, mut handle) = mock::pair(); - let mock = load::Constant::new(mock, 0); + task::mock(|cx| { + let (mock, handle) = mock::pair(); + pin_mut!(handle); - let disco = ServiceList::new(vec![mock].into_iter()); - let mut svc = Balance::from_entropy(disco); + let mock = load::Constant::new(mock, 0); + + let disco = ServiceList::new(vec![mock].into_iter()); + let mut svc = Balance::from_entropy(disco); - with_task(|| { handle.allow(0); - assert_not_ready!(svc); + assert_pending!(svc.poll_ready(cx)); assert_eq!(svc.len(), 1, "balancer must have discovered endpoint"); handle.allow(1); - assert_ready!(svc); + assert_ready_ok!(svc.poll_ready(cx)); let fut = svc.call(()); + pin_mut!(fut); - let ((), rsp) = handle.next_request().unwrap(); - rsp.send_response(1); + assert_request_eq!(handle, ()).send_response(1); - assert_eq!(fut.wait().expect("call must complete"), 1); + assert_eq!(assert_ready_ok!(fut.poll(cx)), 1); handle.allow(1); - assert_ready!(svc); + assert_ready_ok!(svc.poll_ready(cx)); handle.send_error("endpoint lost"); - assert_not_ready!(svc); + assert_pending!(svc.poll_ready(cx)); assert!(svc.len() == 0, "balancer must drop failed endpoints"); }); } #[test] fn two_endpoints_with_equal_load() { - let (mock_a, mut handle_a) = mock::pair(); - let (mock_b, mut handle_b) = mock::pair(); - let mock_a = load::Constant::new(mock_a, 1); - let mock_b = load::Constant::new(mock_b, 1); + task::mock(|cx| { + let (mock_a, handle_a) = mock::pair(); + let (mock_b, handle_b) = mock::pair(); + let mock_a = load::Constant::new(mock_a, 1); + let mock_b = load::Constant::new(mock_b, 1); - let disco = ServiceList::new(vec![mock_a, mock_b].into_iter()); - let mut svc = Balance::from_entropy(disco); + pin_mut!(handle_a); + pin_mut!(handle_b); + + let disco = ServiceList::new(vec![mock_a, mock_b].into_iter()); + let mut svc = Balance::from_entropy(disco); - with_task(|| { handle_a.allow(0); handle_b.allow(0); - assert_not_ready!(svc); + assert_pending!(svc.poll_ready(cx)); assert_eq!(svc.len(), 2, "balancer must have discovered both endpoints"); handle_a.allow(1); handle_b.allow(0); - assert_ready!(svc, "must be ready when one of two services is ready"); + assert_ready_ok!( + svc.poll_ready(cx), + "must be ready when one of two services is ready" + ); { let fut = svc.call(()); - let ((), rsp) = handle_a.next_request().unwrap(); - rsp.send_response("a"); - assert_eq!(fut.wait().expect("call must complete"), "a"); + pin_mut!(fut); + assert_request_eq!(handle_a, ()).send_response("a"); + assert_eq!(assert_ready_ok!(fut.poll(cx)), "a"); } handle_a.allow(0); handle_b.allow(1); - assert_ready!(svc, "must be ready when both endpoints are ready"); + assert_ready_ok!( + svc.poll_ready(cx), + "must be ready when both endpoints are ready" + ); { let fut = svc.call(()); - let ((), rsp) = handle_b.next_request().unwrap(); - rsp.send_response("b"); - assert_eq!(fut.wait().expect("call must complete"), "b"); + pin_mut!(fut); + assert_request_eq!(handle_b, ()).send_response("b"); + assert_eq!(assert_ready_ok!(fut.poll(cx)), "b"); } handle_a.allow(1); handle_b.allow(1); for _ in 0..2 { - assert_ready!(svc, "must be ready when both endpoints are ready"); + assert_ready_ok!( + svc.poll_ready(cx), + "must be ready when both endpoints are ready" + ); let fut = svc.call(()); + pin_mut!(fut); for (ref mut h, c) in &mut [(&mut handle_a, "a"), (&mut handle_b, "b")] { - if let Async::Ready(Some((_, tx))) = h.poll_request().unwrap() { + if let Poll::Ready(Some((_, tx))) = h.as_mut().poll_request(cx) { tracing::info!("using {}", c); tx.send_response(c); h.allow(0); } } - fut.wait().expect("call must complete"); + assert_ready_ok!(fut.as_mut().poll(cx)); } handle_a.send_error("endpoint lost"); - assert_not_ready!(svc, "must be not be ready"); + assert_pending!(svc.poll_ready(cx)); assert_eq!(svc.len(), 1, "balancer must drop failed endpoints",); }); } - -fn with_task U, U>(f: F) -> U { - use futures::future::lazy; - lazy(|| Ok::<_, ()>(f())).wait().unwrap() -} diff --git a/tower-balance/src/pool/mod.rs b/tower-balance/src/pool/mod.rs index d3420f55..d08730a0 100644 --- a/tower-balance/src/pool/mod.rs +++ b/tower-balance/src/pool/mod.rs @@ -2,7 +2,7 @@ //! //! The pool uses `poll_ready` as a signal indicating whether additional services should be spawned //! to handle the current level of load. Specifically, every time `poll_ready` on the inner service -//! returns `Ready`, [`Pool`] consider that a 0, and every time it returns `NotReady`, [`Pool`] +//! returns `Ready`, [`Pool`] consider that a 0, and every time it returns `Pending`, [`Pool`] //! considers it a 1. [`Pool`] then maintains an [exponential moving //! average](https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average) over those //! samples, which gives an estimate of how often the underlying service has been ready when it was @@ -16,12 +16,18 @@ use super::p2c::Balance; use crate::error; -use futures::{try_ready, Async, Future, Poll, Stream}; +use futures_core::{ready, Stream}; +use pin_project::pin_project; use slab::Slab; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; use tower_discover::{Change, Discover}; use tower_load::Load; +use tower_make::MakeService; use tower_service::Service; -use tower_util::MakeService; #[cfg(test)] mod test; @@ -36,6 +42,7 @@ enum Level { High, } +#[pin_project] /// A wrapper around `MakeService` that discovers a new service when load is high, and removes a /// service when load is low. See [`Pool`]. pub struct PoolDiscoverer @@ -43,11 +50,13 @@ where MS: MakeService, { maker: MS, + #[pin] making: Option, target: Target, load: Level, services: Slab<()>, died_tx: tokio_sync::mpsc::UnboundedSender, + #[pin] died_rx: tokio_sync::mpsc::UnboundedReceiver, limit: Option, } @@ -63,83 +72,84 @@ where type Service = DropNotifyService; type Error = MS::MakeError; - fn poll(&mut self) -> Poll, Self::Error> { - while let Async::Ready(Some(sid)) = self - .died_rx - .poll() - .expect("cannot be closed as we hold tx too") - { - self.services.remove(sid); + fn poll_discover( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>> { + let mut this = self.project(); + + while let Poll::Ready(Some(sid)) = this.died_rx.as_mut().poll_next(cx) { + this.services.remove(sid); tracing::trace!( - pool.services = self.services.len(), + pool.services = this.services.len(), message = "removing dropped service" ); } - if self.services.len() == 0 && self.making.is_none() { - let _ = try_ready!(self.maker.poll_ready()); + if this.services.len() == 0 && this.making.is_none() { + let _ = ready!(this.maker.poll_ready(cx))?; tracing::trace!("construct initial pool connection"); - self.making = Some(self.maker.make_service(self.target.clone())); + this.making + .set(Some(this.maker.make_service(this.target.clone()))); } - if let Level::High = self.load { - if self.making.is_none() { - if self + if let Level::High = this.load { + if this.making.is_none() { + if this .limit - .map(|limit| self.services.len() >= limit) + .map(|limit| this.services.len() >= limit) .unwrap_or(false) { - return Ok(Async::NotReady); + return Poll::Pending; } tracing::trace!( - pool.services = self.services.len(), + pool.services = this.services.len(), message = "decided to add service to loaded pool" ); - try_ready!(self.maker.poll_ready()); + ready!(this.maker.poll_ready(cx))?; tracing::trace!("making new service"); // TODO: it'd be great if we could avoid the clone here and use, say, &Target - self.making = Some(self.maker.make_service(self.target.clone())); + this.making + .set(Some(this.maker.make_service(this.target.clone()))); } } - if let Some(mut fut) = self.making.take() { - if let Async::Ready(svc) = fut.poll()? { - let id = self.services.insert(()); - let svc = DropNotifyService { - svc, - id, - notify: self.died_tx.clone(), - }; - tracing::trace!( - pool.services = self.services.len(), - message = "finished creating new service" - ); - self.load = Level::Normal; - return Ok(Async::Ready(Change::Insert(id, svc))); - } else { - self.making = Some(fut); - return Ok(Async::NotReady); - } + if let Some(fut) = this.making.as_mut().as_pin_mut() { + let svc = ready!(fut.poll(cx))?; + this.making.set(None); + + let id = this.services.insert(()); + let svc = DropNotifyService { + svc, + id, + notify: this.died_tx.clone(), + }; + tracing::trace!( + pool.services = this.services.len(), + message = "finished creating new service" + ); + *this.load = Level::Normal; + return Poll::Ready(Ok(Change::Insert(id, svc))); } - match self.load { + match this.load { Level::High => { unreachable!("found high load but no Service being made"); } - Level::Normal => Ok(Async::NotReady), - Level::Low if self.services.len() == 1 => Ok(Async::NotReady), + Level::Normal => Poll::Pending, + Level::Low if this.services.len() == 1 => Poll::Pending, Level::Low => { - self.load = Level::Normal; + *this.load = Level::Normal; // NOTE: this is a little sad -- we'd prefer to kill short-living services - let rm = self.services.iter().next().unwrap().0; + let rm = this.services.iter().next().unwrap().0; // note that we _don't_ remove from self.services here // that'll happen automatically on drop tracing::trace!( - pool.services = self.services.len(), + pool.services = this.services.len(), message = "removing service for over-provisioned pool" ); - Ok(Async::Ready(Change::Remove(rm))) + Poll::Ready(Ok(Change::Remove(rm))) } } } @@ -183,7 +193,7 @@ impl Builder { /// threshold, and there are at least two services active, a service is removed. /// /// The default value is 0.01. That is, when one in every 100 `poll_ready` calls return - /// `NotReady`, then the underlying service is considered underutilized. + /// `Pending`, then the underlying service is considered underutilized. pub fn underutilized_below(&mut self, low: f64) -> &mut Self { self.low = low; self @@ -194,7 +204,7 @@ impl Builder { /// scheduled to be added to the underlying [`Balance`]. /// /// The default value is 0.5. That is, when every other call to `poll_ready` returns - /// `NotReady`, then the underlying service is considered highly loaded. + /// `Pending`, then the underlying service is considered highly loaded. pub fn loaded_above(&mut self, high: f64) -> &mut Self { self.high = high; self @@ -267,7 +277,7 @@ impl Builder { }; Pool { - balance: Balance::from_entropy(d), + balance: Balance::from_entropy(Box::pin(d)), options: *self, ewma: self.init, } @@ -282,7 +292,8 @@ where MS::Error: Into, Target: Clone, { - balance: Balance, Request>, + // the Pin> here is needed since Balance requires the Service to be Unpin + balance: Balance>>, Request>, options: Builder, ewma: f64, } @@ -298,7 +309,7 @@ where { /// Construct a new dynamically sized `Pool`. /// - /// If many calls to `poll_ready` return `NotReady`, `new_service` is used to + /// If many calls to `poll_ready` return `Pending`, `new_service` is used to /// construct another `Service` that is then added to the load-balanced pool. /// If many calls to `poll_ready` succeed, the most recently added `Service` /// is dropped from the pool. @@ -307,6 +318,8 @@ where } } +type PinBalance = Balance>, Request>; + impl Service for Pool where MS: MakeService, @@ -316,48 +329,50 @@ where MS::Error: Into, Target: Clone, { - type Response = , Req> as Service>::Response; - type Error = , Req> as Service>::Error; - type Future = , Req> as Service>::Future; + type Response = , Req> as Service>::Response; + type Error = , Req> as Service>::Error; + type Future = , Req> as Service>::Future; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - if let Async::Ready(()) = self.balance.poll_ready()? { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + if let Poll::Ready(()) = self.balance.poll_ready(cx)? { // services was ready -- there are enough services // update ewma with a 0 sample self.ewma = (1.0 - self.options.alpha) * self.ewma; - let discover = self.balance.discover_mut(); + let mut discover = self.balance.discover_mut().as_mut(); + let discover = discover.project(); if self.ewma < self.options.low { - if discover.load != Level::Low { + if *discover.load != Level::Low { tracing::trace!({ ewma = %self.ewma }, "pool is over-provisioned"); } - discover.load = Level::Low; + *discover.load = Level::Low; if discover.services.len() > 1 { // reset EWMA so we don't immediately try to remove another service self.ewma = self.options.init; } } else { - if discover.load != Level::Normal { + if *discover.load != Level::Normal { tracing::trace!({ ewma = %self.ewma }, "pool is appropriately provisioned"); } - discover.load = Level::Normal; + *discover.load = Level::Normal; } - return Ok(Async::Ready(())); + return Poll::Ready(Ok(())); } - let discover = self.balance.discover_mut(); + let mut discover = self.balance.discover_mut().as_mut(); + let discover = discover.project(); if discover.making.is_none() { // no services are ready -- we're overloaded // update ewma with a 1 sample self.ewma = self.options.alpha + (1.0 - self.options.alpha) * self.ewma; if self.ewma > self.options.high { - if discover.load != Level::High { + if *discover.load != Level::High { tracing::trace!({ ewma = %self.ewma }, "pool is under-provisioned"); } - discover.load = Level::High; + *discover.load = Level::High; // don't reset the EWMA -- in theory, poll_ready should now start returning // `Ready`, so we won't try to launch another service immediately. @@ -366,13 +381,13 @@ where // we need to call balance again for PoolDiscover to realize // it can make a new service - return self.balance.poll_ready(); + return self.balance.poll_ready(cx); } else { - discover.load = Level::Normal; + *discover.load = Level::Normal; } } - Ok(Async::NotReady) + Poll::Pending } fn call(&mut self, req: Req) -> Self::Future { @@ -405,8 +420,8 @@ impl> Service for DropNotifyService type Future = Svc::Future; type Error = Svc::Error; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.svc.poll_ready() + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.svc.poll_ready(cx) } fn call(&mut self, req: Request) -> Self::Future { diff --git a/tower-balance/src/pool/test.rs b/tower-balance/src/pool/test.rs index d73caa41..a07d6a7a 100644 --- a/tower-balance/src/pool/test.rs +++ b/tower-balance/src/pool/test.rs @@ -1,285 +1,200 @@ -use futures::{Async, Future}; +use futures_util::pin_mut; +use tokio_test::{assert_pending, assert_ready, assert_ready_ok, task}; use tower_load as load; use tower_service::Service; -use tower_test::mock; +use tower_test::{assert_request_eq, mock}; use super::*; -macro_rules! assert_fut_ready { - ($fut:expr, $val:expr) => {{ - assert_fut_ready!($fut, $val, "must be ready"); - }}; - ($fut:expr, $val:expr, $msg:expr) => {{ - assert_eq!( - $fut.poll().expect("must not fail"), - Async::Ready($val), - $msg - ); - }}; -} - -macro_rules! assert_ready { - ($svc:expr) => {{ - assert_ready!($svc, "must be ready"); - }}; - ($svc:expr, $msg:expr) => {{ - assert!($svc.poll_ready().expect("must not fail").is_ready(), $msg); - }}; -} - -macro_rules! assert_fut_not_ready { - ($fut:expr) => {{ - assert_fut_not_ready!($fut, "must not be ready"); - }}; - ($fut:expr, $msg:expr) => {{ - assert!(!$fut.poll().expect("must not fail").is_ready(), $msg); - }}; -} - -macro_rules! assert_not_ready { - ($svc:expr) => {{ - assert_not_ready!($svc, "must not be ready"); - }}; - ($svc:expr, $msg:expr) => {{ - assert!(!$svc.poll_ready().expect("must not fail").is_ready(), $msg); - }}; -} - #[test] fn basic() { - // start the pool - let (mock, mut handle) = - mock::pair::<(), load::Constant, usize>>(); - let mut pool = Builder::new().build(mock, ()); - with_task(|| { - assert_not_ready!(pool); - }); + task::mock(|cx| { + // start the pool + let (mock, handle) = + mock::pair::<(), load::Constant, usize>>(); + pin_mut!(handle); - // give the pool a backing service - let (svc1_m, mut svc1) = mock::pair(); - handle - .next_request() - .unwrap() - .1 - .send_response(load::Constant::new(svc1_m, 0)); - with_task(|| { - assert_ready!(pool); - }); + let mut pool = Builder::new().build(mock, ()); + assert_pending!(pool.poll_ready(cx)); - // send a request to the one backing service - let mut fut = pool.call(()); - with_task(|| { - assert_fut_not_ready!(fut); - }); - svc1.next_request().unwrap().1.send_response("foobar"); - with_task(|| { - assert_fut_ready!(fut, "foobar"); + // give the pool a backing service + let (svc1_m, svc1) = mock::pair(); + pin_mut!(svc1); + + assert_request_eq!(handle, ()).send_response(load::Constant::new(svc1_m, 0)); + assert_ready_ok!(pool.poll_ready(cx)); + + // send a request to the one backing service + let fut = pool.call(()); + pin_mut!(fut); + assert_pending!(fut.as_mut().poll(cx)); + assert_request_eq!(svc1, ()).send_response("foobar"); + assert_eq!(assert_ready_ok!(fut.poll(cx)), "foobar"); }); } #[test] fn high_load() { - // start the pool - let (mock, mut handle) = - mock::pair::<(), load::Constant, usize>>(); - let mut pool = Builder::new() - .urgency(1.0) // so _any_ NotReady will add a service - .underutilized_below(0.0) // so no Ready will remove a service - .max_services(Some(2)) - .build(mock, ()); - with_task(|| { - assert_not_ready!(pool); - }); + task::mock(|cx| { + // start the pool + let (mock, handle) = + mock::pair::<(), load::Constant, usize>>(); + pin_mut!(handle); - // give the pool a backing service - let (svc1_m, mut svc1) = mock::pair(); - svc1.allow(1); - handle - .next_request() - .unwrap() - .1 - .send_response(load::Constant::new(svc1_m, 0)); - with_task(|| { - assert_ready!(pool); - }); + let mut pool = Builder::new() + .urgency(1.0) // so _any_ Pending will add a service + .underutilized_below(0.0) // so no Ready will remove a service + .max_services(Some(2)) + .build(mock, ()); + assert_pending!(pool.poll_ready(cx)); - // make the one backing service not ready - let mut fut1 = pool.call(()); + // give the pool a backing service + let (svc1_m, svc1) = mock::pair(); + pin_mut!(svc1); - // if we poll_ready again, pool should notice that load is increasing - // since urgency == 1.0, it should immediately enter high load - with_task(|| { - assert_not_ready!(pool); - }); - // it should ask the maker for another service, so we give it one - let (svc2_m, mut svc2) = mock::pair(); - svc2.allow(1); - handle - .next_request() - .unwrap() - .1 - .send_response(load::Constant::new(svc2_m, 0)); + svc1.allow(1); + assert_request_eq!(handle, ()).send_response(load::Constant::new(svc1_m, 0)); + assert_ready_ok!(pool.poll_ready(cx)); - // the pool should now be ready again for one more request - with_task(|| { - assert_ready!(pool); - }); - let mut fut2 = pool.call(()); - with_task(|| { - assert_not_ready!(pool); - }); + // make the one backing service not ready + let fut1 = pool.call(()); + pin_mut!(fut1); - // the pool should _not_ try to add another service - // sicen we have max_services(2) - with_task(|| { - assert!(!handle.poll_request().unwrap().is_ready()); - }); + // if we poll_ready again, pool should notice that load is increasing + // since urgency == 1.0, it should immediately enter high load + assert_pending!(pool.poll_ready(cx)); + // it should ask the maker for another service, so we give it one + let (svc2_m, svc2) = mock::pair(); + pin_mut!(svc2); - // let see that each service got one request - svc1.next_request().unwrap().1.send_response("foo"); - svc2.next_request().unwrap().1.send_response("bar"); - with_task(|| { - assert_fut_ready!(fut1, "foo"); - }); - with_task(|| { - assert_fut_ready!(fut2, "bar"); + svc2.allow(1); + assert_request_eq!(handle, ()).send_response(load::Constant::new(svc2_m, 0)); + + // the pool should now be ready again for one more request + assert_ready_ok!(pool.poll_ready(cx)); + let fut2 = pool.call(()); + pin_mut!(fut2); + assert_pending!(pool.poll_ready(cx)); + + // the pool should _not_ try to add another service + // sicen we have max_services(2) + assert_pending!(handle.as_mut().poll_request(cx)); + + // let see that each service got one request + assert_request_eq!(svc1, ()).send_response("foo"); + assert_request_eq!(svc2, ()).send_response("bar"); + assert_eq!(assert_ready_ok!(fut1.poll(cx)), "foo"); + assert_eq!(assert_ready_ok!(fut2.poll(cx)), "bar"); }); } #[test] fn low_load() { - // start the pool - let (mock, mut handle) = - mock::pair::<(), load::Constant, usize>>(); - let mut pool = Builder::new() - .urgency(1.0) // so any event will change the service count - .build(mock, ()); - with_task(|| { - assert_not_ready!(pool); - }); + task::mock(|cx| { + // start the pool + let (mock, handle) = + mock::pair::<(), load::Constant, usize>>(); + pin_mut!(handle); - // give the pool a backing service - let (svc1_m, mut svc1) = mock::pair(); - svc1.allow(1); - handle - .next_request() - .unwrap() - .1 - .send_response(load::Constant::new(svc1_m, 0)); - with_task(|| { - assert_ready!(pool); - }); + let mut pool = Builder::new() + .urgency(1.0) // so any event will change the service count + .build(mock, ()); + assert_pending!(pool.poll_ready(cx)); - // cycling a request should now work - let mut fut = pool.call(()); - svc1.next_request().unwrap().1.send_response("foo"); - with_task(|| { - assert_fut_ready!(fut, "foo"); - }); - // and pool should now not be ready (since svc1 isn't ready) - // it should immediately try to add another service - // which we give it - with_task(|| { - assert_not_ready!(pool); - }); - let (svc2_m, mut svc2) = mock::pair(); - svc2.allow(1); - handle - .next_request() - .unwrap() - .1 - .send_response(load::Constant::new(svc2_m, 0)); - // pool is now ready - // which (because of urgency == 1.0) should immediately cause it to drop a service - // it'll drop svc1, so it'll still be ready - with_task(|| { - assert_ready!(pool); - }); - // and even with another ready, it won't drop svc2 since its now the only service - with_task(|| { - assert_ready!(pool); - }); + // give the pool a backing service + let (svc1_m, svc1) = mock::pair(); + pin_mut!(svc1); - // cycling a request should now work on svc2 - let mut fut = pool.call(()); - svc2.next_request().unwrap().1.send_response("foo"); - with_task(|| { - assert_fut_ready!(fut, "foo"); - }); + svc1.allow(1); + assert_request_eq!(handle, ()).send_response(load::Constant::new(svc1_m, 0)); + assert_ready_ok!(pool.poll_ready(cx)); - // and again (still svc2) - svc2.allow(1); - with_task(|| { - assert_ready!(pool); - }); - let mut fut = pool.call(()); - svc2.next_request().unwrap().1.send_response("foo"); - with_task(|| { - assert_fut_ready!(fut, "foo"); + // cycling a request should now work + let fut = pool.call(()); + pin_mut!(fut); + + assert_request_eq!(svc1, ()).send_response("foo"); + assert_eq!(assert_ready_ok!(fut.poll(cx)), "foo"); + // and pool should now not be ready (since svc1 isn't ready) + // it should immediately try to add another service + // which we give it + assert_pending!(pool.poll_ready(cx)); + let (svc2_m, svc2) = mock::pair(); + pin_mut!(svc2); + + svc2.allow(1); + assert_request_eq!(handle, ()).send_response(load::Constant::new(svc2_m, 0)); + // pool is now ready + // which (because of urgency == 1.0) should immediately cause it to drop a service + // it'll drop svc1, so it'll still be ready + assert_ready_ok!(pool.poll_ready(cx)); + // and even with another ready, it won't drop svc2 since its now the only service + assert_ready_ok!(pool.poll_ready(cx)); + + // cycling a request should now work on svc2 + let fut = pool.call(()); + pin_mut!(fut); + assert_request_eq!(svc2, ()).send_response("foo"); + assert_eq!(assert_ready_ok!(fut.poll(cx)), "foo"); + + // and again (still svc2) + svc2.allow(1); + assert_ready_ok!(pool.poll_ready(cx)); + let fut = pool.call(()); + pin_mut!(fut); + assert_request_eq!(svc2, ()).send_response("foo"); + assert_eq!(assert_ready_ok!(fut.poll(cx)), "foo"); }); } #[test] fn failing_service() { - // start the pool - let (mock, mut handle) = - mock::pair::<(), load::Constant, usize>>(); - let mut pool = Builder::new() - .urgency(1.0) // so _any_ NotReady will add a service - .underutilized_below(0.0) // so no Ready will remove a service - .build(mock, ()); - with_task(|| { - assert_not_ready!(pool); - }); + task::mock(|cx| { + // start the pool + let (mock, handle) = + mock::pair::<(), load::Constant, usize>>(); + pin_mut!(handle); - // give the pool a backing service - let (svc1_m, mut svc1) = mock::pair(); - svc1.allow(1); - handle - .next_request() - .unwrap() - .1 - .send_response(load::Constant::new(svc1_m, 0)); - with_task(|| { - assert_ready!(pool); - }); + let mut pool = Builder::new() + .urgency(1.0) // so _any_ Pending will add a service + .underutilized_below(0.0) // so no Ready will remove a service + .build(mock, ()); + assert_pending!(pool.poll_ready(cx)); - // one request-response cycle - let mut fut = pool.call(()); - svc1.next_request().unwrap().1.send_response("foo"); - with_task(|| { - assert_fut_ready!(fut, "foo"); - }); + // give the pool a backing service + let (svc1_m, svc1) = mock::pair(); + pin_mut!(svc1); - // now make svc1 fail, so it has to be removed - svc1.send_error("ouch"); - // polling now should recognize the failed service, - // try to create a new one, and then realize the maker isn't ready - with_task(|| { - assert_not_ready!(pool); - }); - // then we release another service - let (svc2_m, mut svc2) = mock::pair(); - svc2.allow(1); - handle - .next_request() - .unwrap() - .1 - .send_response(load::Constant::new(svc2_m, 0)); + svc1.allow(1); + assert_request_eq!(handle, ()).send_response(load::Constant::new(svc1_m, 0)); + assert_ready_ok!(pool.poll_ready(cx)); - // the pool should now be ready again - with_task(|| { - assert_ready!(pool); - }); - // and a cycle should work (and go through svc2) - let mut fut = pool.call(()); - svc2.next_request().unwrap().1.send_response("bar"); - with_task(|| { - assert_fut_ready!(fut, "bar"); + // one request-response cycle + let fut = pool.call(()); + pin_mut!(fut); + + assert_request_eq!(svc1, ()).send_response("foo"); + assert_eq!(assert_ready_ok!(fut.poll(cx)), "foo"); + + // now make svc1 fail, so it has to be removed + svc1.send_error("ouch"); + // polling now should recognize the failed service, + // try to create a new one, and then realize the maker isn't ready + assert_pending!(pool.poll_ready(cx)); + // then we release another service + let (svc2_m, svc2) = mock::pair(); + pin_mut!(svc2); + + svc2.allow(1); + assert_request_eq!(handle, ()).send_response(load::Constant::new(svc2_m, 0)); + + // the pool should now be ready again + assert_ready_ok!(pool.poll_ready(cx)); + // and a cycle should work (and go through svc2) + let fut = pool.call(()); + pin_mut!(fut); + + assert_request_eq!(svc2, ()).send_response("bar"); + assert_eq!(assert_ready_ok!(fut.poll(cx)), "bar"); }); } - -fn with_task U, U>(f: F) -> U { - use futures::future::lazy; - lazy(|| Ok::<_, ()>(f())).wait().unwrap() -} diff --git a/tower-discover/src/lib.rs b/tower-discover/src/lib.rs index c0611d5f..eba95d01 100644 --- a/tower-discover/src/lib.rs +++ b/tower-discover/src/lib.rs @@ -16,6 +16,7 @@ mod stream; pub use crate::{list::ServiceList, stream::ServiceStream}; use std::hash::Hash; +use std::ops; use std::{ pin::Pin, task::{Context, Poll}, @@ -43,6 +44,49 @@ pub trait Discover { ) -> Poll, Self::Error>>; } +// delegate through Pin +impl

Discover for Pin

+where + P: Unpin + ops::DerefMut, + P::Target: Discover, +{ + type Key = <

::Target as Discover>::Key; + type Service = <

::Target as Discover>::Service; + type Error = <

::Target as Discover>::Error; + + fn poll_discover( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>> { + Pin::get_mut(self).as_mut().poll_discover(cx) + } +} +impl Discover for &mut D { + type Key = D::Key; + type Service = D::Service; + type Error = D::Error; + + fn poll_discover( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>> { + Discover::poll_discover(Pin::new(&mut **self), cx) + } +} + +impl Discover for Box { + type Key = D::Key; + type Service = D::Service; + type Error = D::Error; + + fn poll_discover( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>> { + D::poll_discover(Pin::new(&mut *self), cx) + } +} + /// A change in the service set pub enum Change { Insert(K, V), diff --git a/tower-load/src/constant.rs b/tower-load/src/constant.rs index e02c1aa4..7777f8c7 100644 --- a/tower-load/src/constant.rs +++ b/tower-load/src/constant.rs @@ -13,6 +13,7 @@ use crate::Load; /// Wraps a type so that `Load::load` returns a constant value. #[pin_project] +#[derive(Debug)] pub struct Constant { inner: T, load: M,