diff --git a/tower/CHANGELOG.md b/tower/CHANGELOG.md index 14881540..4bb381eb 100644 --- a/tower/CHANGELOG.md +++ b/tower/CHANGELOG.md @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - **builder**: Add `ServiceBuilder::map_result` analogous to `ServiceExt::map_result`. +- **limit**: Add `GlobalConcurrencyLimitLayer` allowing to reuse concurrency + limit across multiple services ([#574]) # 0.4.7 (April 27, 2021) diff --git a/tower/src/limit/concurrency/layer.rs b/tower/src/limit/concurrency/layer.rs index 68ecd810..1966d578 100644 --- a/tower/src/limit/concurrency/layer.rs +++ b/tower/src/limit/concurrency/layer.rs @@ -1,4 +1,7 @@ +use std::sync::Arc; + use super::ConcurrencyLimit; +use tokio::sync::Semaphore; use tower_layer::Layer; /// Enforces a limit on the concurrent number of requests the underlying @@ -22,3 +25,36 @@ impl Layer for ConcurrencyLimitLayer { ConcurrencyLimit::new(service, self.max) } } + +/// Enforces a limit on the concurrent number of requests the underlying +/// service can handle. +/// +/// Unlike [`ConcurrencyLimitLayer`], which enforces a per-service concurrency +/// limit, this layer accepts a owned semaphore (`Arc`) which can be +/// shared across multiple services. +/// +/// Cloning this layer will not create a new semaphore. +#[derive(Debug, Clone)] +pub struct GlobalConcurrencyLimitLayer { + semaphore: Arc, +} + +impl GlobalConcurrencyLimitLayer { + /// Create a new `GlobalConcurrencyLimitLayer`. + pub fn new(max: usize) -> Self { + Self::with_semaphore(Arc::new(Semaphore::new(max))) + } + + /// Create a new `GlobalConcurrencyLimitLayer` from a `Arc` + pub fn with_semaphore(semaphore: Arc) -> Self { + GlobalConcurrencyLimitLayer { semaphore } + } +} + +impl Layer for GlobalConcurrencyLimitLayer { + type Service = ConcurrencyLimit; + + fn layer(&self, service: S) -> Self::Service { + ConcurrencyLimit::with_semaphore(service, self.semaphore.clone()) + } +} diff --git a/tower/src/limit/concurrency/mod.rs b/tower/src/limit/concurrency/mod.rs index a1eba7e0..ac0be8a5 100644 --- a/tower/src/limit/concurrency/mod.rs +++ b/tower/src/limit/concurrency/mod.rs @@ -4,4 +4,7 @@ pub mod future; mod layer; mod service; -pub use self::{layer::ConcurrencyLimitLayer, service::ConcurrencyLimit}; +pub use self::{ + layer::{ConcurrencyLimitLayer, GlobalConcurrencyLimitLayer}, + service::ConcurrencyLimit, +}; diff --git a/tower/src/limit/concurrency/service.rs b/tower/src/limit/concurrency/service.rs index 5f8601f8..02d85345 100644 --- a/tower/src/limit/concurrency/service.rs +++ b/tower/src/limit/concurrency/service.rs @@ -26,9 +26,14 @@ pub struct ConcurrencyLimit { impl ConcurrencyLimit { /// Create a new concurrency limiter. pub fn new(inner: T, max: usize) -> Self { + Self::with_semaphore(inner, Arc::new(Semaphore::new(max))) + } + + /// Create a new concurrency limiter with a provided shared semaphore + pub fn with_semaphore(inner: T, semaphore: Arc) -> Self { ConcurrencyLimit { inner, - semaphore: PollSemaphore::new(Arc::new(Semaphore::new(max))), + semaphore: PollSemaphore::new(semaphore), permit: None, } } diff --git a/tower/src/limit/mod.rs b/tower/src/limit/mod.rs index 9af07e1b..6a10dcae 100644 --- a/tower/src/limit/mod.rs +++ b/tower/src/limit/mod.rs @@ -4,6 +4,6 @@ pub mod concurrency; pub mod rate; pub use self::{ - concurrency::{ConcurrencyLimit, ConcurrencyLimitLayer}, + concurrency::{ConcurrencyLimit, ConcurrencyLimitLayer, GlobalConcurrencyLimitLayer}, rate::{RateLimit, RateLimitLayer}, };