diff --git a/tower-balance/Cargo.toml b/tower-balance/Cargo.toml index dacc2548..4db8f662 100644 --- a/tower-balance/Cargo.toml +++ b/tower-balance/Cargo.toml @@ -38,6 +38,7 @@ tower-layer = "0.1.0" tower-load = { version = "0.1.0", path = "../tower-load" } tower-service = "0.2.0" tower-util = "0.1.0" +slab = "0.4" [dev-dependencies] tracing-fmt = { git = "https://github.com/tokio-rs/tracing.git" } diff --git a/tower-balance/src/pool.rs b/tower-balance/src/pool/mod.rs similarity index 78% rename from tower-balance/src/pool.rs rename to tower-balance/src/pool/mod.rs index 7d1c1716..d3420f55 100644 --- a/tower-balance/src/pool.rs +++ b/tower-balance/src/pool/mod.rs @@ -15,12 +15,17 @@ #![deny(missing_docs)] use super::p2c::Balance; -use futures::{try_ready, Async, Future, Poll}; +use crate::error; +use futures::{try_ready, Async, Future, Poll, Stream}; +use slab::Slab; use tower_discover::{Change, Discover}; use tower_load::Load; use tower_service::Service; use tower_util::MakeService; +#[cfg(test)] +mod test; + #[derive(Debug, Clone, Copy, Eq, PartialEq)] enum Level { /// Load is low -- remove a service instance. @@ -41,24 +46,38 @@ where making: Option, target: Target, load: Level, - services: usize, + services: Slab<()>, + died_tx: tokio_sync::mpsc::UnboundedSender, + died_rx: tokio_sync::mpsc::UnboundedReceiver, limit: Option, } impl Discover for PoolDiscoverer where MS: MakeService, - // NOTE: these bounds should go away once MakeService adopts Box - MS::MakeError: ::std::error::Error + Send + Sync + 'static, - MS::Error: ::std::error::Error + Send + Sync + 'static, + MS::MakeError: Into, + MS::Error: Into, Target: Clone, { type Key = usize; - type Service = MS::Service; + type Service = DropNotifyService; type Error = MS::MakeError; fn poll(&mut self) -> Poll, Self::Error> { - if self.services == 0 && self.making.is_none() { + while let Async::Ready(Some(sid)) = self + .died_rx + .poll() + .expect("cannot be closed as we hold tx too") + { + self.services.remove(sid); + tracing::trace!( + pool.services = self.services.len(), + message = "removing dropped service" + ); + } + + if self.services.len() == 0 && self.making.is_none() { + let _ = try_ready!(self.maker.poll_ready()); tracing::trace!("construct initial pool connection"); self.making = Some(self.maker.make_service(self.target.clone())); } @@ -67,14 +86,14 @@ where if self.making.is_none() { if self .limit - .map(|limit| self.services >= limit) + .map(|limit| self.services.len() >= limit) .unwrap_or(false) { return Ok(Async::NotReady); } tracing::trace!( - pool.services = self.services, + pool.services = self.services.len(), message = "decided to add service to loaded pool" ); try_ready!(self.maker.poll_ready()); @@ -85,14 +104,19 @@ where } if let Some(mut fut) = self.making.take() { - if let Async::Ready(s) = fut.poll()? { - self.services += 1; + if let Async::Ready(svc) = fut.poll()? { + let id = self.services.insert(()); + let svc = DropNotifyService { + svc, + id, + notify: self.died_tx.clone(), + }; tracing::trace!( - pool.services = self.services, + pool.services = self.services.len(), message = "finished creating new service" ); self.load = Level::Normal; - return Ok(Async::Ready(Change::Insert(self.services, s))); + return Ok(Async::Ready(Change::Insert(id, svc))); } else { self.making = Some(fut); return Ok(Async::NotReady); @@ -104,13 +128,15 @@ where unreachable!("found high load but no Service being made"); } Level::Normal => Ok(Async::NotReady), - Level::Low if self.services == 1 => Ok(Async::NotReady), + Level::Low if self.services.len() == 1 => Ok(Async::NotReady), Level::Low => { self.load = Level::Normal; - let rm = self.services; - self.services -= 1; + // NOTE: this is a little sad -- we'd prefer to kill short-living services + let rm = self.services.iter().next().unwrap().0; + // note that we _don't_ remove from self.services here + // that'll happen automatically on drop tracing::trace!( - pool.services = self.services, + pool.services = self.services.len(), message = "removing service for over-provisioned pool" ); Ok(Async::Ready(Change::Remove(rm))) @@ -224,16 +250,19 @@ impl Builder { MS: MakeService, MS::Service: Load, ::Metric: std::fmt::Debug, - MS::MakeError: ::std::error::Error + Send + Sync + 'static, - MS::Error: ::std::error::Error + Send + Sync + 'static, + MS::MakeError: Into, + MS::Error: Into, Target: Clone, { + let (died_tx, died_rx) = tokio_sync::mpsc::unbounded_channel(); let d = PoolDiscoverer { maker: make_service, making: None, target, load: Level::Normal, - services: 0, + services: Slab::new(), + died_tx, + died_rx, limit: self.limit, }; @@ -249,8 +278,8 @@ impl Builder { pub struct Pool where MS: MakeService, - MS::MakeError: ::std::error::Error + Send + Sync + 'static, - MS::Error: ::std::error::Error + Send + Sync + 'static, + MS::MakeError: Into, + MS::Error: Into, Target: Clone, { balance: Balance, Request>, @@ -263,8 +292,8 @@ where MS: MakeService, MS::Service: Load, ::Metric: std::fmt::Debug, - MS::MakeError: ::std::error::Error + Send + Sync + 'static, - MS::Error: ::std::error::Error + Send + Sync + 'static, + MS::MakeError: Into, + MS::Error: Into, Target: Clone, { /// Construct a new dynamically sized `Pool`. @@ -283,8 +312,8 @@ where MS: MakeService, MS::Service: Load, ::Metric: std::fmt::Debug, - MS::MakeError: ::std::error::Error + Send + Sync + 'static, - MS::Error: ::std::error::Error + Send + Sync + 'static, + MS::MakeError: Into, + MS::Error: Into, Target: Clone, { type Response = , Req> as Service>::Response; @@ -304,7 +333,7 @@ where } discover.load = Level::Low; - if discover.services > 1 { + if discover.services.len() > 1 { // reset EWMA so we don't immediately try to remove another service self.ewma = self.options.init; } @@ -334,6 +363,10 @@ where // `Ready`, so we won't try to launch another service immediately. // we clamp it to high though in case the # of services is limited. self.ewma = self.options.high; + + // we need to call balance again for PoolDiscover to realize + // it can make a new service + return self.balance.poll_ready(); } else { discover.load = Level::Normal; } @@ -346,3 +379,37 @@ where self.balance.call(req) } } + +#[doc(hidden)] +pub struct DropNotifyService { + svc: Svc, + id: usize, + notify: tokio_sync::mpsc::UnboundedSender, +} + +impl Drop for DropNotifyService { + fn drop(&mut self) { + let _ = self.notify.try_send(self.id).is_ok(); + } +} + +impl Load for DropNotifyService { + type Metric = Svc::Metric; + fn load(&self) -> Self::Metric { + self.svc.load() + } +} + +impl> Service for DropNotifyService { + type Response = Svc::Response; + type Future = Svc::Future; + type Error = Svc::Error; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + self.svc.poll_ready() + } + + fn call(&mut self, req: Request) -> Self::Future { + self.svc.call(req) + } +} diff --git a/tower-balance/src/pool/test.rs b/tower-balance/src/pool/test.rs new file mode 100644 index 00000000..d73caa41 --- /dev/null +++ b/tower-balance/src/pool/test.rs @@ -0,0 +1,285 @@ +use futures::{Async, Future}; +use tower_load as load; +use tower_service::Service; +use tower_test::mock; + +use super::*; + +macro_rules! assert_fut_ready { + ($fut:expr, $val:expr) => {{ + assert_fut_ready!($fut, $val, "must be ready"); + }}; + ($fut:expr, $val:expr, $msg:expr) => {{ + assert_eq!( + $fut.poll().expect("must not fail"), + Async::Ready($val), + $msg + ); + }}; +} + +macro_rules! assert_ready { + ($svc:expr) => {{ + assert_ready!($svc, "must be ready"); + }}; + ($svc:expr, $msg:expr) => {{ + assert!($svc.poll_ready().expect("must not fail").is_ready(), $msg); + }}; +} + +macro_rules! assert_fut_not_ready { + ($fut:expr) => {{ + assert_fut_not_ready!($fut, "must not be ready"); + }}; + ($fut:expr, $msg:expr) => {{ + assert!(!$fut.poll().expect("must not fail").is_ready(), $msg); + }}; +} + +macro_rules! assert_not_ready { + ($svc:expr) => {{ + assert_not_ready!($svc, "must not be ready"); + }}; + ($svc:expr, $msg:expr) => {{ + assert!(!$svc.poll_ready().expect("must not fail").is_ready(), $msg); + }}; +} + +#[test] +fn basic() { + // start the pool + let (mock, mut handle) = + mock::pair::<(), load::Constant, usize>>(); + let mut pool = Builder::new().build(mock, ()); + with_task(|| { + assert_not_ready!(pool); + }); + + // give the pool a backing service + let (svc1_m, mut svc1) = mock::pair(); + handle + .next_request() + .unwrap() + .1 + .send_response(load::Constant::new(svc1_m, 0)); + with_task(|| { + assert_ready!(pool); + }); + + // send a request to the one backing service + let mut fut = pool.call(()); + with_task(|| { + assert_fut_not_ready!(fut); + }); + svc1.next_request().unwrap().1.send_response("foobar"); + with_task(|| { + assert_fut_ready!(fut, "foobar"); + }); +} + +#[test] +fn high_load() { + // start the pool + let (mock, mut handle) = + mock::pair::<(), load::Constant, usize>>(); + let mut pool = Builder::new() + .urgency(1.0) // so _any_ NotReady will add a service + .underutilized_below(0.0) // so no Ready will remove a service + .max_services(Some(2)) + .build(mock, ()); + with_task(|| { + assert_not_ready!(pool); + }); + + // give the pool a backing service + let (svc1_m, mut svc1) = mock::pair(); + svc1.allow(1); + handle + .next_request() + .unwrap() + .1 + .send_response(load::Constant::new(svc1_m, 0)); + with_task(|| { + assert_ready!(pool); + }); + + // make the one backing service not ready + let mut fut1 = pool.call(()); + + // if we poll_ready again, pool should notice that load is increasing + // since urgency == 1.0, it should immediately enter high load + with_task(|| { + assert_not_ready!(pool); + }); + // it should ask the maker for another service, so we give it one + let (svc2_m, mut svc2) = mock::pair(); + svc2.allow(1); + handle + .next_request() + .unwrap() + .1 + .send_response(load::Constant::new(svc2_m, 0)); + + // the pool should now be ready again for one more request + with_task(|| { + assert_ready!(pool); + }); + let mut fut2 = pool.call(()); + with_task(|| { + assert_not_ready!(pool); + }); + + // the pool should _not_ try to add another service + // sicen we have max_services(2) + with_task(|| { + assert!(!handle.poll_request().unwrap().is_ready()); + }); + + // let see that each service got one request + svc1.next_request().unwrap().1.send_response("foo"); + svc2.next_request().unwrap().1.send_response("bar"); + with_task(|| { + assert_fut_ready!(fut1, "foo"); + }); + with_task(|| { + assert_fut_ready!(fut2, "bar"); + }); +} + +#[test] +fn low_load() { + // start the pool + let (mock, mut handle) = + mock::pair::<(), load::Constant, usize>>(); + let mut pool = Builder::new() + .urgency(1.0) // so any event will change the service count + .build(mock, ()); + with_task(|| { + assert_not_ready!(pool); + }); + + // give the pool a backing service + let (svc1_m, mut svc1) = mock::pair(); + svc1.allow(1); + handle + .next_request() + .unwrap() + .1 + .send_response(load::Constant::new(svc1_m, 0)); + with_task(|| { + assert_ready!(pool); + }); + + // cycling a request should now work + let mut fut = pool.call(()); + svc1.next_request().unwrap().1.send_response("foo"); + with_task(|| { + assert_fut_ready!(fut, "foo"); + }); + // and pool should now not be ready (since svc1 isn't ready) + // it should immediately try to add another service + // which we give it + with_task(|| { + assert_not_ready!(pool); + }); + let (svc2_m, mut svc2) = mock::pair(); + svc2.allow(1); + handle + .next_request() + .unwrap() + .1 + .send_response(load::Constant::new(svc2_m, 0)); + // pool is now ready + // which (because of urgency == 1.0) should immediately cause it to drop a service + // it'll drop svc1, so it'll still be ready + with_task(|| { + assert_ready!(pool); + }); + // and even with another ready, it won't drop svc2 since its now the only service + with_task(|| { + assert_ready!(pool); + }); + + // cycling a request should now work on svc2 + let mut fut = pool.call(()); + svc2.next_request().unwrap().1.send_response("foo"); + with_task(|| { + assert_fut_ready!(fut, "foo"); + }); + + // and again (still svc2) + svc2.allow(1); + with_task(|| { + assert_ready!(pool); + }); + let mut fut = pool.call(()); + svc2.next_request().unwrap().1.send_response("foo"); + with_task(|| { + assert_fut_ready!(fut, "foo"); + }); +} + +#[test] +fn failing_service() { + // start the pool + let (mock, mut handle) = + mock::pair::<(), load::Constant, usize>>(); + let mut pool = Builder::new() + .urgency(1.0) // so _any_ NotReady will add a service + .underutilized_below(0.0) // so no Ready will remove a service + .build(mock, ()); + with_task(|| { + assert_not_ready!(pool); + }); + + // give the pool a backing service + let (svc1_m, mut svc1) = mock::pair(); + svc1.allow(1); + handle + .next_request() + .unwrap() + .1 + .send_response(load::Constant::new(svc1_m, 0)); + with_task(|| { + assert_ready!(pool); + }); + + // one request-response cycle + let mut fut = pool.call(()); + svc1.next_request().unwrap().1.send_response("foo"); + with_task(|| { + assert_fut_ready!(fut, "foo"); + }); + + // now make svc1 fail, so it has to be removed + svc1.send_error("ouch"); + // polling now should recognize the failed service, + // try to create a new one, and then realize the maker isn't ready + with_task(|| { + assert_not_ready!(pool); + }); + // then we release another service + let (svc2_m, mut svc2) = mock::pair(); + svc2.allow(1); + handle + .next_request() + .unwrap() + .1 + .send_response(load::Constant::new(svc2_m, 0)); + + // the pool should now be ready again + with_task(|| { + assert_ready!(pool); + }); + // and a cycle should work (and go through svc2) + let mut fut = pool.call(()); + svc2.next_request().unwrap().1.send_response("bar"); + with_task(|| { + assert_fut_ready!(fut, "bar"); + }); +} + +fn with_task U, U>(f: F) -> U { + use futures::future::lazy; + lazy(|| Ok::<_, ()>(f())).wait().unwrap() +}