From d27ba65891590b848fa9ba13a202d5d4aa5eda81 Mon Sep 17 00:00:00 2001 From: boraarslan Date: Mon, 24 Oct 2022 21:28:22 +0300 Subject: [PATCH] retry: Add `Budget` trait (#703) --- tower/src/retry/budget/mod.rs | 91 ++++++ .../retry/{budget.rs => budget/tps_budget.rs} | 272 ++++++------------ 2 files changed, 183 insertions(+), 180 deletions(-) create mode 100644 tower/src/retry/budget/mod.rs rename tower/src/retry/{budget.rs => budget/tps_budget.rs} (50%) diff --git a/tower/src/retry/budget/mod.rs b/tower/src/retry/budget/mod.rs new file mode 100644 index 00000000..3d1d2b87 --- /dev/null +++ b/tower/src/retry/budget/mod.rs @@ -0,0 +1,91 @@ +//! A retry "budget" for allowing only a certain amount of retries over time. +//! +//! # Why budgets and not max retries? +//! +//! The most common way of configuring retries is to specify a maximum +//! number of retry attempts to perform before giving up. This is a familiar idea to anyone +//! who’s used a web browser: you try to load a webpage, and if it doesn’t load, you try again. +//! If it still doesn’t load, you try a third time. Finally you give up. +//! +//! Unfortunately, there are at least two problems with configuring retries this way: +//! +//! **Choosing the maximum number of retry attempts is a guessing game.** +//! You need to pick a number that’s high enough to make a difference when things are somewhat failing, +//! but not so high that it generates extra load on the system when it’s really failing. In practice, +//! you usually pick a maximum retry attempts number out of a hat (e.g. 3) and hope for the best. +//! +//! **Systems configured this way are vulnerable to retry storms.** +//! A retry storm begins when one service starts to experience a larger than normal failure rate. +//! This causes its clients to retry those failed requests. The extra load from the retries causes the +//! service to slow down further and fail more requests, triggering more retries. If each client is +//! configured to retry up to 3 times, this can quadruple the number of requests being sent! To make +//! matters even worse, if any of the clients’ clients are configured with retries, the number of retries +//! compounds multiplicatively and can turn a small number of errors into a self-inflicted denial of service attack. +//! +//! It's generally dangerous to implement retries without some limiting factor. [`Budget`]s are that limit. +//! +//! # Examples +//! +//! ```rust +//! use std::sync::Arc; +//! +//! use futures_util::future; +//! use tower::retry::{budget::{Budget, TpsBudget}, Policy}; +//! +//! type Req = String; +//! type Res = String; +//! +//! #[derive(Clone, Debug)] +//! struct RetryPolicy { +//! budget: Arc, +//! } +//! +//! impl Policy for RetryPolicy { +//! type Future = future::Ready<()>; +//! +//! fn retry(&mut self, req: &mut Req, result: &mut Result) -> Option { +//! match result { +//! Ok(_) => { +//! // Treat all `Response`s as success, +//! // so deposit budget and don't retry... +//! self.budget.deposit(); +//! None +//! } +//! Err(_) => { +//! // Treat all errors as failures... +//! // Withdraw the budget, don't retry if we overdrew. +//! let withdrew = self.budget.withdraw(); +//! if !withdrew { +//! return None; +//! } +//! +//! // Try again! +//! Some(future::ready(())) +//! } +//! } +//! } +//! +//! fn clone_request(&mut self, req: &Req) -> Option { +//! Some(req.clone()) +//! } +//! } +//! ``` + +pub mod tps_budget; + +pub use tps_budget::TpsBudget; + +/// For more info about [`Budget`], please see the [module-level documentation]. +/// +/// [module-level documentation]: self +pub trait Budget { + /// Store a "deposit" in the budget, which will be used to permit future + /// withdrawals. + fn deposit(&self); + + /// Check whether there is enough "balance" in the budget to issue a new + /// retry. + /// + /// If there is not enough, false is returned. + fn withdraw(&self) -> bool; +} diff --git a/tower/src/retry/budget.rs b/tower/src/retry/budget/tps_budget.rs similarity index 50% rename from tower/src/retry/budget.rs rename to tower/src/retry/budget/tps_budget.rs index 35f7fa7e..3f1d530b 100644 --- a/tower/src/retry/budget.rs +++ b/tower/src/retry/budget/tps_budget.rs @@ -1,75 +1,4 @@ -//! A retry "budget" for allowing only a certain amount of retries over time. -//! -//! # Why budgets and not max retries? -//! -//! The most common way of configuring retries is to specify a maximum -//! number of retry attempts to perform before giving up. This is a familiar idea to anyone -//! who’s used a web browser: you try to load a webpage, and if it doesn’t load, you try again. -//! If it still doesn’t load, you try a third time. Finally you give up. -//! -//! Unfortunately, there are at least two problems with configuring retries this way: -//! -//! **Choosing the maximum number of retry attempts is a guessing game.** -//! You need to pick a number that’s high enough to make a difference when things are somewhat failing, -//! but not so high that it generates extra load on the system when it’s really failing. In practice, -//! you usually pick a maximum retry attempts number out of a hat (e.g. 3) and hope for the best. -//! -//! **Systems configured this way are vulnerable to retry storms.** -//! A retry storm begins when one service starts to experience a larger than normal failure rate. -//! This causes its clients to retry those failed requests. The extra load from the retries causes the -//! service to slow down further and fail more requests, triggering more retries. If each client is -//! configured to retry up to 3 times, this can quadruple the number of requests being sent! To make -//! matters even worse, if any of the clients’ clients are configured with retries, the number of retries -//! compounds multiplicatively and can turn a small number of errors into a self-inflicted denial of service attack. -//! -//! It's generally dangerous to implement retries without some limiting factor. [`Budget`]s are that limit. -//! -//! # Examples -//! -//! ```rust -//! use std::sync::Arc; -//! -//! use futures_util::future; -//! use tower::retry::{budget::Budget, Policy}; -//! -//! type Req = String; -//! type Res = String; -//! -//! #[derive(Clone, Debug)] -//! struct RetryPolicy { -//! budget: Arc, -//! } -//! -//! impl Policy for RetryPolicy { -//! type Future = future::Ready<()>; -//! -//! fn retry(&mut self, req: &mut Req, result: &mut Result) -> Option { -//! match result { -//! Ok(_) => { -//! // Treat all `Response`s as success, -//! // so deposit budget and don't retry... -//! self.budget.deposit(); -//! None -//! } -//! Err(_) => { -//! // Treat all errors as failures... -//! // Withdraw the budget, don't retry if we overdrew. -//! let withdrew = self.budget.withdraw().is_ok(); -//! if !withdrew { -//! return None; -//! } -//! -//! // Try again! -//! Some(future::ready(())) -//! } -//! } -//! } -//! -//! fn clone_request(&mut self, req: &Req) -> Option { -//! Some(req.clone()) -//! } -//! } -//! ``` +//! Transactions Per Minute (Tps) Budget implementations use std::{ fmt, @@ -81,29 +10,20 @@ use std::{ }; use tokio::time::Instant; -/// Represents a "budget" for retrying requests. +use super::Budget; + +/// A Transactions Per Minute config for managing retry tokens. /// -/// This is useful for limiting the amount of retries a service can perform -/// over a period of time, or per a certain number of requests attempted. +/// [`TpsBudget`] uses a token bucket to decide if the request should be retried. +/// +/// [`TpsBudget`] works by checking how much retries have been made in a certain period of time. +/// Minimum allowed number of retries are effectively reset on an interval. Allowed number of +/// retries depends on failed request count in recent time frame. /// /// For more info about [`Budget`], please see the [module-level documentation]. /// -/// [module-level documentation]: self -pub struct Budget { - bucket: Bucket, - deposit_amount: isize, - withdraw_amount: isize, -} - -/// Indicates that it is not currently allowed to "withdraw" another retry -/// from the [`Budget`]. -#[derive(Debug)] -pub struct Overdrawn { - _inner: (), -} - -#[derive(Debug)] -struct Bucket { +/// [module-level documentation]: super +pub struct TpsBudget { generation: Mutex, /// Initial budget allowed for every second. reserve: isize, @@ -114,6 +34,10 @@ struct Bucket { /// The changers for the current slot to be commited /// after the slot expires. writer: AtomicIsize, + /// Amount of tokens to deposit for each put(). + deposit_amount: isize, + /// Amount of tokens to withdraw for each try_get(). + withdraw_amount: isize, } #[derive(Debug)] @@ -124,10 +48,10 @@ struct Generation { time: Instant, } -// ===== impl Budget ===== +// ===== impl TpsBudget ===== -impl Budget { - /// Create a [`Budget`] that allows for a certain percent of the total +impl TpsBudget { + /// Create a [`TpsBudget`] that allows for a certain percent of the total /// requests to be retried. /// /// - The `ttl` is the duration of how long a single `deposit` should be @@ -173,79 +97,20 @@ impl Budget { slots.push(AtomicIsize::new(0)); } - Budget { - bucket: Bucket { - generation: Mutex::new(Generation { - index: 0, - time: Instant::now(), - }), - reserve, - slots: slots.into_boxed_slice(), - window: ttl / windows, - writer: AtomicIsize::new(0), - }, + TpsBudget { + generation: Mutex::new(Generation { + index: 0, + time: Instant::now(), + }), + reserve, + slots: slots.into_boxed_slice(), + window: ttl / windows, + writer: AtomicIsize::new(0), deposit_amount, withdraw_amount, } } - /// Store a "deposit" in the budget, which will be used to permit future - /// withdrawals. - pub fn deposit(&self) { - self.bucket.put(self.deposit_amount); - } - - /// Check whether there is enough "balance" in the budget to issue a new - /// retry. - /// - /// If there is not enough, an `Err(Overdrawn)` is returned. - pub fn withdraw(&self) -> Result<(), Overdrawn> { - if self.bucket.try_get(self.withdraw_amount) { - Ok(()) - } else { - Err(Overdrawn { _inner: () }) - } - } -} - -impl Default for Budget { - fn default() -> Budget { - Budget::new(Duration::from_secs(10), 10, 0.2) - } -} - -impl fmt::Debug for Budget { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("Budget") - .field("deposit", &self.deposit_amount) - .field("withdraw", &self.withdraw_amount) - .field("balance", &self.bucket.sum()) - .finish() - } -} - -// ===== impl Bucket ===== - -impl Bucket { - fn put(&self, amt: isize) { - self.expire(); - self.writer.fetch_add(amt, Ordering::SeqCst); - } - - fn try_get(&self, amt: isize) -> bool { - debug_assert!(amt >= 0); - - self.expire(); - - let sum = self.sum(); - if sum >= amt { - self.writer.fetch_add(-amt, Ordering::SeqCst); - true - } else { - false - } - } - fn expire(&self) { let mut gen = self.generation.lock().expect("generation lock"); @@ -284,41 +149,88 @@ impl Bucket { .saturating_add(windowed_sum) .saturating_add(self.reserve) } + + fn put(&self, amt: isize) { + self.expire(); + self.writer.fetch_add(amt, Ordering::SeqCst); + } + + fn try_get(&self, amt: isize) -> bool { + debug_assert!(amt >= 0); + + self.expire(); + + let sum = self.sum(); + if sum >= amt { + self.writer.fetch_add(-amt, Ordering::SeqCst); + true + } else { + false + } + } +} + +impl Budget for TpsBudget { + fn deposit(&self) { + self.put(self.deposit_amount) + } + + fn withdraw(&self) -> bool { + self.try_get(self.withdraw_amount) + } +} + +impl Default for TpsBudget { + fn default() -> Self { + TpsBudget::new(Duration::from_secs(10), 10, 0.2) + } +} + +impl fmt::Debug for TpsBudget { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Budget") + .field("deposit", &self.deposit_amount) + .field("withdraw", &self.withdraw_amount) + .field("balance", &self.sum()) + .finish() + } } #[cfg(test)] mod tests { + use crate::retry::budget::Budget; + use super::*; use tokio::time; #[test] - fn empty() { - let bgt = Budget::new(Duration::from_secs(1), 0, 1.0); - bgt.withdraw().unwrap_err(); + fn tps_empty() { + let bgt = TpsBudget::new(Duration::from_secs(1), 0, 1.0); + assert!(!bgt.withdraw()); } #[tokio::test] - async fn leaky() { + async fn tps_leaky() { time::pause(); - let bgt = Budget::new(Duration::from_secs(1), 0, 1.0); + let bgt = TpsBudget::new(Duration::from_secs(1), 0, 1.0); bgt.deposit(); time::advance(Duration::from_secs(3)).await; - bgt.withdraw().unwrap_err(); + assert!(!bgt.withdraw()); } #[tokio::test] - async fn slots() { + async fn tps_slots() { time::pause(); - let bgt = Budget::new(Duration::from_secs(1), 0, 0.5); + let bgt = TpsBudget::new(Duration::from_secs(1), 0, 0.5); bgt.deposit(); bgt.deposit(); time::advance(Duration::from_millis(901)).await; // 900ms later, the deposit should still be valid - bgt.withdraw().unwrap(); + assert!(bgt.withdraw()); // blank slate time::advance(Duration::from_millis(2001)).await; @@ -331,18 +243,18 @@ mod tests { // the first deposit is expired, but the 2nd should still be valid, // combining with the 3rd - bgt.withdraw().unwrap(); + assert!(bgt.withdraw()); } #[tokio::test] - async fn reserve() { - let bgt = Budget::new(Duration::from_secs(1), 5, 1.0); - bgt.withdraw().unwrap(); - bgt.withdraw().unwrap(); - bgt.withdraw().unwrap(); - bgt.withdraw().unwrap(); - bgt.withdraw().unwrap(); + async fn tps_reserve() { + let bgt = TpsBudget::new(Duration::from_secs(1), 5, 1.0); + assert!(bgt.withdraw()); + assert!(bgt.withdraw()); + assert!(bgt.withdraw()); + assert!(bgt.withdraw()); + assert!(bgt.withdraw()); - bgt.withdraw().unwrap_err(); + assert!(!bgt.withdraw()); } }