retry: Add Budget trait (#703)

This commit is contained in:
boraarslan 2022-10-24 21:28:22 +03:00 committed by GitHub
parent 582a0e0c74
commit d27ba65891
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 183 additions and 180 deletions

View File

@ -0,0 +1,91 @@
//! A retry "budget" for allowing only a certain amount of retries over time.
//!
//! # Why budgets and not max retries?
//!
//! The most common way of configuring retries is to specify a maximum
//! number of retry attempts to perform before giving up. This is a familiar idea to anyone
//! whos used a web browser: you try to load a webpage, and if it doesnt load, you try again.
//! If it still doesnt load, you try a third time. Finally you give up.
//!
//! Unfortunately, there are at least two problems with configuring retries this way:
//!
//! **Choosing the maximum number of retry attempts is a guessing game.**
//! You need to pick a number thats high enough to make a difference when things are somewhat failing,
//! but not so high that it generates extra load on the system when its really failing. In practice,
//! you usually pick a maximum retry attempts number out of a hat (e.g. 3) and hope for the best.
//!
//! **Systems configured this way are vulnerable to retry storms.**
//! A retry storm begins when one service starts to experience a larger than normal failure rate.
//! This causes its clients to retry those failed requests. The extra load from the retries causes the
//! service to slow down further and fail more requests, triggering more retries. If each client is
//! configured to retry up to 3 times, this can quadruple the number of requests being sent! To make
//! matters even worse, if any of the clients clients are configured with retries, the number of retries
//! compounds multiplicatively and can turn a small number of errors into a self-inflicted denial of service attack.
//!
//! It's generally dangerous to implement retries without some limiting factor. [`Budget`]s are that limit.
//!
//! # Examples
//!
//! ```rust
//! use std::sync::Arc;
//!
//! use futures_util::future;
//! use tower::retry::{budget::{Budget, TpsBudget}, Policy};
//!
//! type Req = String;
//! type Res = String;
//!
//! #[derive(Clone, Debug)]
//! struct RetryPolicy {
//! budget: Arc<TpsBudget>,
//! }
//!
//! impl<E> Policy<Req, Res, E> for RetryPolicy {
//! type Future = future::Ready<()>;
//!
//! fn retry(&mut self, req: &mut Req, result: &mut Result<Res, E>) -> Option<Self::Future> {
//! match result {
//! Ok(_) => {
//! // Treat all `Response`s as success,
//! // so deposit budget and don't retry...
//! self.budget.deposit();
//! None
//! }
//! Err(_) => {
//! // Treat all errors as failures...
//! // Withdraw the budget, don't retry if we overdrew.
//! let withdrew = self.budget.withdraw();
//! if !withdrew {
//! return None;
//! }
//!
//! // Try again!
//! Some(future::ready(()))
//! }
//! }
//! }
//!
//! fn clone_request(&mut self, req: &Req) -> Option<Req> {
//! Some(req.clone())
//! }
//! }
//! ```
pub mod tps_budget;
pub use tps_budget::TpsBudget;
/// For more info about [`Budget`], please see the [module-level documentation].
///
/// [module-level documentation]: self
pub trait Budget {
/// Store a "deposit" in the budget, which will be used to permit future
/// withdrawals.
fn deposit(&self);
/// Check whether there is enough "balance" in the budget to issue a new
/// retry.
///
/// If there is not enough, false is returned.
fn withdraw(&self) -> bool;
}

View File

@ -1,75 +1,4 @@
//! A retry "budget" for allowing only a certain amount of retries over time.
//!
//! # Why budgets and not max retries?
//!
//! The most common way of configuring retries is to specify a maximum
//! number of retry attempts to perform before giving up. This is a familiar idea to anyone
//! whos used a web browser: you try to load a webpage, and if it doesnt load, you try again.
//! If it still doesnt load, you try a third time. Finally you give up.
//!
//! Unfortunately, there are at least two problems with configuring retries this way:
//!
//! **Choosing the maximum number of retry attempts is a guessing game.**
//! You need to pick a number thats high enough to make a difference when things are somewhat failing,
//! but not so high that it generates extra load on the system when its really failing. In practice,
//! you usually pick a maximum retry attempts number out of a hat (e.g. 3) and hope for the best.
//!
//! **Systems configured this way are vulnerable to retry storms.**
//! A retry storm begins when one service starts to experience a larger than normal failure rate.
//! This causes its clients to retry those failed requests. The extra load from the retries causes the
//! service to slow down further and fail more requests, triggering more retries. If each client is
//! configured to retry up to 3 times, this can quadruple the number of requests being sent! To make
//! matters even worse, if any of the clients clients are configured with retries, the number of retries
//! compounds multiplicatively and can turn a small number of errors into a self-inflicted denial of service attack.
//!
//! It's generally dangerous to implement retries without some limiting factor. [`Budget`]s are that limit.
//!
//! # Examples
//!
//! ```rust
//! use std::sync::Arc;
//!
//! use futures_util::future;
//! use tower::retry::{budget::Budget, Policy};
//!
//! type Req = String;
//! type Res = String;
//!
//! #[derive(Clone, Debug)]
//! struct RetryPolicy {
//! budget: Arc<Budget>,
//! }
//!
//! impl<E> Policy<Req, Res, E> for RetryPolicy {
//! type Future = future::Ready<()>;
//!
//! fn retry(&mut self, req: &mut Req, result: &mut Result<Res, E>) -> Option<Self::Future> {
//! match result {
//! Ok(_) => {
//! // Treat all `Response`s as success,
//! // so deposit budget and don't retry...
//! self.budget.deposit();
//! None
//! }
//! Err(_) => {
//! // Treat all errors as failures...
//! // Withdraw the budget, don't retry if we overdrew.
//! let withdrew = self.budget.withdraw().is_ok();
//! if !withdrew {
//! return None;
//! }
//!
//! // Try again!
//! Some(future::ready(()))
//! }
//! }
//! }
//!
//! fn clone_request(&mut self, req: &Req) -> Option<Req> {
//! Some(req.clone())
//! }
//! }
//! ```
//! Transactions Per Minute (Tps) Budget implementations
use std::{
fmt,
@ -81,29 +10,20 @@ use std::{
};
use tokio::time::Instant;
/// Represents a "budget" for retrying requests.
use super::Budget;
/// A Transactions Per Minute config for managing retry tokens.
///
/// This is useful for limiting the amount of retries a service can perform
/// over a period of time, or per a certain number of requests attempted.
/// [`TpsBudget`] uses a token bucket to decide if the request should be retried.
///
/// [`TpsBudget`] works by checking how much retries have been made in a certain period of time.
/// Minimum allowed number of retries are effectively reset on an interval. Allowed number of
/// retries depends on failed request count in recent time frame.
///
/// For more info about [`Budget`], please see the [module-level documentation].
///
/// [module-level documentation]: self
pub struct Budget {
bucket: Bucket,
deposit_amount: isize,
withdraw_amount: isize,
}
/// Indicates that it is not currently allowed to "withdraw" another retry
/// from the [`Budget`].
#[derive(Debug)]
pub struct Overdrawn {
_inner: (),
}
#[derive(Debug)]
struct Bucket {
/// [module-level documentation]: super
pub struct TpsBudget {
generation: Mutex<Generation>,
/// Initial budget allowed for every second.
reserve: isize,
@ -114,6 +34,10 @@ struct Bucket {
/// The changers for the current slot to be commited
/// after the slot expires.
writer: AtomicIsize,
/// Amount of tokens to deposit for each put().
deposit_amount: isize,
/// Amount of tokens to withdraw for each try_get().
withdraw_amount: isize,
}
#[derive(Debug)]
@ -124,10 +48,10 @@ struct Generation {
time: Instant,
}
// ===== impl Budget =====
// ===== impl TpsBudget =====
impl Budget {
/// Create a [`Budget`] that allows for a certain percent of the total
impl TpsBudget {
/// Create a [`TpsBudget`] that allows for a certain percent of the total
/// requests to be retried.
///
/// - The `ttl` is the duration of how long a single `deposit` should be
@ -173,79 +97,20 @@ impl Budget {
slots.push(AtomicIsize::new(0));
}
Budget {
bucket: Bucket {
generation: Mutex::new(Generation {
index: 0,
time: Instant::now(),
}),
reserve,
slots: slots.into_boxed_slice(),
window: ttl / windows,
writer: AtomicIsize::new(0),
},
TpsBudget {
generation: Mutex::new(Generation {
index: 0,
time: Instant::now(),
}),
reserve,
slots: slots.into_boxed_slice(),
window: ttl / windows,
writer: AtomicIsize::new(0),
deposit_amount,
withdraw_amount,
}
}
/// Store a "deposit" in the budget, which will be used to permit future
/// withdrawals.
pub fn deposit(&self) {
self.bucket.put(self.deposit_amount);
}
/// Check whether there is enough "balance" in the budget to issue a new
/// retry.
///
/// If there is not enough, an `Err(Overdrawn)` is returned.
pub fn withdraw(&self) -> Result<(), Overdrawn> {
if self.bucket.try_get(self.withdraw_amount) {
Ok(())
} else {
Err(Overdrawn { _inner: () })
}
}
}
impl Default for Budget {
fn default() -> Budget {
Budget::new(Duration::from_secs(10), 10, 0.2)
}
}
impl fmt::Debug for Budget {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Budget")
.field("deposit", &self.deposit_amount)
.field("withdraw", &self.withdraw_amount)
.field("balance", &self.bucket.sum())
.finish()
}
}
// ===== impl Bucket =====
impl Bucket {
fn put(&self, amt: isize) {
self.expire();
self.writer.fetch_add(amt, Ordering::SeqCst);
}
fn try_get(&self, amt: isize) -> bool {
debug_assert!(amt >= 0);
self.expire();
let sum = self.sum();
if sum >= amt {
self.writer.fetch_add(-amt, Ordering::SeqCst);
true
} else {
false
}
}
fn expire(&self) {
let mut gen = self.generation.lock().expect("generation lock");
@ -284,41 +149,88 @@ impl Bucket {
.saturating_add(windowed_sum)
.saturating_add(self.reserve)
}
fn put(&self, amt: isize) {
self.expire();
self.writer.fetch_add(amt, Ordering::SeqCst);
}
fn try_get(&self, amt: isize) -> bool {
debug_assert!(amt >= 0);
self.expire();
let sum = self.sum();
if sum >= amt {
self.writer.fetch_add(-amt, Ordering::SeqCst);
true
} else {
false
}
}
}
impl Budget for TpsBudget {
fn deposit(&self) {
self.put(self.deposit_amount)
}
fn withdraw(&self) -> bool {
self.try_get(self.withdraw_amount)
}
}
impl Default for TpsBudget {
fn default() -> Self {
TpsBudget::new(Duration::from_secs(10), 10, 0.2)
}
}
impl fmt::Debug for TpsBudget {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Budget")
.field("deposit", &self.deposit_amount)
.field("withdraw", &self.withdraw_amount)
.field("balance", &self.sum())
.finish()
}
}
#[cfg(test)]
mod tests {
use crate::retry::budget::Budget;
use super::*;
use tokio::time;
#[test]
fn empty() {
let bgt = Budget::new(Duration::from_secs(1), 0, 1.0);
bgt.withdraw().unwrap_err();
fn tps_empty() {
let bgt = TpsBudget::new(Duration::from_secs(1), 0, 1.0);
assert!(!bgt.withdraw());
}
#[tokio::test]
async fn leaky() {
async fn tps_leaky() {
time::pause();
let bgt = Budget::new(Duration::from_secs(1), 0, 1.0);
let bgt = TpsBudget::new(Duration::from_secs(1), 0, 1.0);
bgt.deposit();
time::advance(Duration::from_secs(3)).await;
bgt.withdraw().unwrap_err();
assert!(!bgt.withdraw());
}
#[tokio::test]
async fn slots() {
async fn tps_slots() {
time::pause();
let bgt = Budget::new(Duration::from_secs(1), 0, 0.5);
let bgt = TpsBudget::new(Duration::from_secs(1), 0, 0.5);
bgt.deposit();
bgt.deposit();
time::advance(Duration::from_millis(901)).await;
// 900ms later, the deposit should still be valid
bgt.withdraw().unwrap();
assert!(bgt.withdraw());
// blank slate
time::advance(Duration::from_millis(2001)).await;
@ -331,18 +243,18 @@ mod tests {
// the first deposit is expired, but the 2nd should still be valid,
// combining with the 3rd
bgt.withdraw().unwrap();
assert!(bgt.withdraw());
}
#[tokio::test]
async fn reserve() {
let bgt = Budget::new(Duration::from_secs(1), 5, 1.0);
bgt.withdraw().unwrap();
bgt.withdraw().unwrap();
bgt.withdraw().unwrap();
bgt.withdraw().unwrap();
bgt.withdraw().unwrap();
async fn tps_reserve() {
let bgt = TpsBudget::new(Duration::from_secs(1), 5, 1.0);
assert!(bgt.withdraw());
assert!(bgt.withdraw());
assert!(bgt.withdraw());
assert!(bgt.withdraw());
assert!(bgt.withdraw());
bgt.withdraw().unwrap_err();
assert!(!bgt.withdraw());
}
}