Allow reusable concurrency limit via GlobalConcurrencyLimit (#574)

* limit: global concurrency limit layer from a owned semaphore

* new_owned -> new_shared + docs improvements

Co-authored-by: David Pedersen <david.pdrsn@gmail.com>

* keep exposing Semaphore, but rename the API a bit and make it simpler to use

* missed a spot

* minor docs fixes

* update changelog

Co-authored-by: David Pedersen <david.pdrsn@gmail.com>
Co-authored-by: Eliza Weisman <eliza@buoyant.io>
This commit is contained in:
Jerome Gravel-Niquet 2021-05-28 10:45:44 -04:00 committed by GitHub
parent 71ece8ea5a
commit 74f9047f30
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 49 additions and 3 deletions

View File

@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- **builder**: Add `ServiceBuilder::map_result` analogous to
`ServiceExt::map_result`.
- **limit**: Add `GlobalConcurrencyLimitLayer` allowing to reuse concurrency
limit across multiple services ([#574])
# 0.4.7 (April 27, 2021)

View File

@ -1,4 +1,7 @@
use std::sync::Arc;
use super::ConcurrencyLimit;
use tokio::sync::Semaphore;
use tower_layer::Layer;
/// Enforces a limit on the concurrent number of requests the underlying
@ -22,3 +25,36 @@ impl<S> Layer<S> for ConcurrencyLimitLayer {
ConcurrencyLimit::new(service, self.max)
}
}
/// Enforces a limit on the concurrent number of requests the underlying
/// service can handle.
///
/// Unlike [`ConcurrencyLimitLayer`], which enforces a per-service concurrency
/// limit, this layer accepts a owned semaphore (`Arc<Semaphore>`) which can be
/// shared across multiple services.
///
/// Cloning this layer will not create a new semaphore.
#[derive(Debug, Clone)]
pub struct GlobalConcurrencyLimitLayer {
semaphore: Arc<Semaphore>,
}
impl GlobalConcurrencyLimitLayer {
/// Create a new `GlobalConcurrencyLimitLayer`.
pub fn new(max: usize) -> Self {
Self::with_semaphore(Arc::new(Semaphore::new(max)))
}
/// Create a new `GlobalConcurrencyLimitLayer` from a `Arc<Semaphore>`
pub fn with_semaphore(semaphore: Arc<Semaphore>) -> Self {
GlobalConcurrencyLimitLayer { semaphore }
}
}
impl<S> Layer<S> for GlobalConcurrencyLimitLayer {
type Service = ConcurrencyLimit<S>;
fn layer(&self, service: S) -> Self::Service {
ConcurrencyLimit::with_semaphore(service, self.semaphore.clone())
}
}

View File

@ -4,4 +4,7 @@ pub mod future;
mod layer;
mod service;
pub use self::{layer::ConcurrencyLimitLayer, service::ConcurrencyLimit};
pub use self::{
layer::{ConcurrencyLimitLayer, GlobalConcurrencyLimitLayer},
service::ConcurrencyLimit,
};

View File

@ -26,9 +26,14 @@ pub struct ConcurrencyLimit<T> {
impl<T> ConcurrencyLimit<T> {
/// Create a new concurrency limiter.
pub fn new(inner: T, max: usize) -> Self {
Self::with_semaphore(inner, Arc::new(Semaphore::new(max)))
}
/// Create a new concurrency limiter with a provided shared semaphore
pub fn with_semaphore(inner: T, semaphore: Arc<Semaphore>) -> Self {
ConcurrencyLimit {
inner,
semaphore: PollSemaphore::new(Arc::new(Semaphore::new(max))),
semaphore: PollSemaphore::new(semaphore),
permit: None,
}
}

View File

@ -4,6 +4,6 @@ pub mod concurrency;
pub mod rate;
pub use self::{
concurrency::{ConcurrencyLimit, ConcurrencyLimitLayer},
concurrency::{ConcurrencyLimit, ConcurrencyLimitLayer, GlobalConcurrencyLimitLayer},
rate::{RateLimit, RateLimitLayer},
};