diff --git a/tower-in-flight-limit/Cargo.toml b/tower-in-flight-limit/Cargo.toml index 4e90b6cf..17de5b72 100644 --- a/tower-in-flight-limit/Cargo.toml +++ b/tower-in-flight-limit/Cargo.toml @@ -9,4 +9,5 @@ futures = "0.1" tower-service = { version = "0.1", path = "../tower-service" } [dev-dependencies] +tokio-test = { git = "https://github.com/carllerche/tokio-test" } tower-mock = { version = "0.1", path = "../tower-mock" } diff --git a/tower-in-flight-limit/src/lib.rs b/tower-in-flight-limit/src/lib.rs index 303a41fb..fa60ec59 100644 --- a/tower-in-flight-limit/src/lib.rs +++ b/tower-in-flight-limit/src/lib.rs @@ -218,7 +218,14 @@ impl Shared { /// request has completed OR the service that made the reservation has /// dropped. pub fn release(&self) { - self.curr.fetch_sub(1, SeqCst); + let prev = self.curr.fetch_sub(1, SeqCst); + + // Cannot go above the max number of in-flight + debug_assert!(prev <= self.max); + + if prev == self.max { + self.task.notify(); + } } } diff --git a/tower-in-flight-limit/tests/in_flight_limit.rs b/tower-in-flight-limit/tests/in_flight_limit.rs index 59284cc3..24c93b44 100644 --- a/tower-in-flight-limit/tests/in_flight_limit.rs +++ b/tower-in-flight-limit/tests/in_flight_limit.rs @@ -1,4 +1,5 @@ extern crate futures; +extern crate tokio_test; extern crate tower_mock; extern crate tower_in_flight_limit; extern crate tower_service; @@ -7,9 +8,12 @@ use tower_in_flight_limit::InFlightLimit; use tower_service::Service; use futures::future::{Future, poll_fn}; +use tokio_test::MockTask; #[test] fn basic_service_limit_functionality_with_poll_ready() { + let mut task = MockTask::new(); + let (mut service, mut handle) = new_service(2); @@ -19,10 +23,12 @@ fn basic_service_limit_functionality_with_poll_ready() { poll_fn(|| service.poll_ready()).wait().unwrap(); let r2 = service.call("hello 2"); - with_task(|| { + task.enter(|| { assert!(service.poll_ready().unwrap().is_not_ready()); }); + assert!(!task.is_notified()); + // The request gets passed through let request = handle.next_request().unwrap(); assert_eq!(*request, "hello 1"); @@ -34,17 +40,21 @@ fn basic_service_limit_functionality_with_poll_ready() { request.respond("world 2"); // There are no more requests - with_task(|| { + task.enter(|| { assert!(handle.poll_request().unwrap().is_not_ready()); }); assert_eq!(r1.wait().unwrap(), "world 1"); + assert!(task.is_notified()); // Another request can be sent - poll_fn(|| service.poll_ready()).wait().unwrap(); + task.enter(|| { + assert!(service.poll_ready().unwrap().is_ready()); + }); + let r3 = service.call("hello 3"); - with_task(|| { + task.enter(|| { assert!(service.poll_ready().unwrap().is_not_ready()); }); @@ -60,6 +70,8 @@ fn basic_service_limit_functionality_with_poll_ready() { #[test] fn basic_service_limit_functionality_without_poll_ready() { + let mut task = MockTask::new(); + let (mut service, mut handle) = new_service(2); @@ -79,7 +91,7 @@ fn basic_service_limit_functionality_without_poll_ready() { request.respond("world 2"); // There are no more requests - with_task(|| { + task.enter(|| { assert!(handle.poll_request().unwrap().is_not_ready()); }); @@ -103,17 +115,19 @@ fn basic_service_limit_functionality_without_poll_ready() { #[test] fn request_without_capacity() { + let mut task = MockTask::new(); + let (mut service, mut handle) = new_service(0); - with_task(|| { + task.enter(|| { assert!(service.poll_ready().unwrap().is_not_ready()); }); let response = service.call("hello"); // There are no more requests - with_task(|| { + task.enter(|| { assert!(handle.poll_request().unwrap().is_not_ready()); }); @@ -122,18 +136,20 @@ fn request_without_capacity() { #[test] fn reserve_capacity_without_sending_request() { + let mut task = MockTask::new(); + let (mut s1, mut handle) = new_service(1); let mut s2 = s1.clone(); // Reserve capacity in s1 - with_task(|| { + task.enter(|| { assert!(s1.poll_ready().unwrap().is_ready()); }); // Service 2 cannot get capacity - with_task(|| { + task.enter(|| { assert!(s2.poll_ready().unwrap().is_not_ready()); }); @@ -142,50 +158,54 @@ fn reserve_capacity_without_sending_request() { let request = handle.next_request().unwrap(); request.respond("world"); - with_task(|| { + task.enter(|| { assert!(s2.poll_ready().unwrap().is_not_ready()); }); r1.wait().unwrap(); - with_task(|| { + task.enter(|| { assert!(s2.poll_ready().unwrap().is_ready()); }); } #[test] fn service_drop_frees_capacity() { + let mut task = MockTask::new(); + let (mut s1, _handle) = new_service(1); let mut s2 = s1.clone(); // Reserve capacity in s1 - with_task(|| { + task.enter(|| { assert!(s1.poll_ready().unwrap().is_ready()); }); // Service 2 cannot get capacity - with_task(|| { + task.enter(|| { assert!(s2.poll_ready().unwrap().is_not_ready()); }); drop(s1); - with_task(|| { + task.enter(|| { assert!(s2.poll_ready().unwrap().is_ready()); }); } #[test] fn response_error_releases_capacity() { + let mut task = MockTask::new(); + let (mut s1, mut handle) = new_service(1); let mut s2 = s1.clone(); // Reserve capacity in s1 - with_task(|| { + task.enter(|| { assert!(s1.poll_ready().unwrap().is_ready()); }); @@ -196,33 +216,35 @@ fn response_error_releases_capacity() { r1.wait().unwrap_err(); - with_task(|| { + task.enter(|| { assert!(s2.poll_ready().unwrap().is_ready()); }); } #[test] fn response_future_drop_releases_capacity() { + let mut task = MockTask::new(); + let (mut s1, _handle) = new_service(1); let mut s2 = s1.clone(); // Reserve capacity in s1 - with_task(|| { + task.enter(|| { assert!(s1.poll_ready().unwrap().is_ready()); }); // s1 sends the request, then s2 is able to get capacity let r1 = s1.call("hello"); - with_task(|| { + task.enter(|| { assert!(s2.poll_ready().unwrap().is_not_ready()); }); drop(r1); - with_task(|| { + task.enter(|| { assert!(s2.poll_ready().unwrap().is_ready()); }); } @@ -235,8 +257,3 @@ fn new_service(max: usize) -> (InFlightLimit, Handle) { let service = InFlightLimit::new(service, max); (service, handle) } - -fn with_task U, U>(f: F) -> U { - use futures::future::{Future, lazy}; - lazy(|| Ok::<_, ()>(f())).wait().unwrap() -}