mirror of
https://github.com/tower-rs/tower.git
synced 2025-09-30 14:31:41 +00:00
retry: Change Policy
to accept &mut self
(#681)
This changes the `Policy` trait in the `retry` layer to accept `&mut self` instead of `&self` and changes the output type of the returned future to `()`. The motivation for this change is to simplify the trait a bit. By the trait methods having mutable references it means that for each retry request session you can mutate the local policy. This is because the policy is cloned for each individual request that arrives into the retry middleware. In addition, this allows the `Policy` trait to be object safe.
This commit is contained in:
parent
6d34340f1e
commit
aec7b8f417
@ -41,9 +41,9 @@
|
||||
//! }
|
||||
//!
|
||||
//! impl<E> Policy<Req, Res, E> for RetryPolicy {
|
||||
//! type Future = future::Ready<Self>;
|
||||
//! type Future = future::Ready<()>;
|
||||
//!
|
||||
//! fn retry(&self, req: &mut Req, result: &mut Result<Res, E>) -> Option<Self::Future> {
|
||||
//! fn retry(&mut self, req: &mut Req, result: &mut Result<Res, E>) -> Option<Self::Future> {
|
||||
//! match result {
|
||||
//! Ok(_) => {
|
||||
//! // Treat all `Response`s as success,
|
||||
@ -60,12 +60,12 @@
|
||||
//! }
|
||||
//!
|
||||
//! // Try again!
|
||||
//! Some(future::ready(self.clone()))
|
||||
//! Some(future::ready(()))
|
||||
//! }
|
||||
//! }
|
||||
//! }
|
||||
//!
|
||||
//! fn clone_request(&self, req: &Req) -> Option<Req> {
|
||||
//! fn clone_request(&mut self, req: &Req) -> Option<Req> {
|
||||
//! Some(req.clone())
|
||||
//! }
|
||||
//! }
|
||||
|
@ -34,11 +34,11 @@ pin_project! {
|
||||
future: F
|
||||
},
|
||||
// Polling the future from [`Policy::retry`]
|
||||
Checking {
|
||||
Waiting {
|
||||
#[pin]
|
||||
checking: P
|
||||
waiting: P
|
||||
},
|
||||
// Polling [`Service::poll_ready`] after [`Checking`] was OK.
|
||||
// Polling [`Service::poll_ready`] after [`Waiting`] was OK.
|
||||
Retrying,
|
||||
}
|
||||
}
|
||||
@ -63,8 +63,8 @@ where
|
||||
|
||||
impl<P, S, Request> Future for ResponseFuture<P, S, Request>
|
||||
where
|
||||
P: Policy<Request, S::Response, S::Error> + Clone,
|
||||
S: Service<Request> + Clone,
|
||||
P: Policy<Request, S::Response, S::Error>,
|
||||
S: Service<Request>,
|
||||
{
|
||||
type Output = Result<S::Response, S::Error>;
|
||||
|
||||
@ -77,8 +77,8 @@ where
|
||||
let mut result = ready!(future.poll(cx));
|
||||
if let Some(req) = &mut this.request {
|
||||
match this.retry.policy.retry(req, &mut result) {
|
||||
Some(checking) => {
|
||||
this.state.set(State::Checking { checking });
|
||||
Some(waiting) => {
|
||||
this.state.set(State::Waiting { waiting });
|
||||
}
|
||||
None => return Poll::Ready(result),
|
||||
}
|
||||
@ -87,12 +87,9 @@ where
|
||||
return Poll::Ready(result);
|
||||
}
|
||||
}
|
||||
StateProj::Checking { checking } => {
|
||||
this.retry
|
||||
.as_mut()
|
||||
.project()
|
||||
.policy
|
||||
.set(ready!(checking.poll(cx)));
|
||||
StateProj::Waiting { waiting } => {
|
||||
ready!(waiting.poll(cx));
|
||||
|
||||
this.state.set(State::Retrying);
|
||||
}
|
||||
StateProj::Retrying => {
|
||||
|
@ -17,9 +17,28 @@ pin_project! {
|
||||
/// Configure retrying requests of "failed" responses.
|
||||
///
|
||||
/// A [`Policy`] classifies what is a "failed" response.
|
||||
///
|
||||
/// # Clone
|
||||
///
|
||||
/// This middleware requires that the inner `Service` implements [`Clone`],
|
||||
/// because the `Service` must be stored in each [`ResponseFuture`] in
|
||||
/// order to retry the request in the event of a failure. If the inner
|
||||
/// `Service` type does not implement `Clone`, the [`Buffer`] middleware
|
||||
/// can be added to make any `Service` cloneable.
|
||||
///
|
||||
/// [`Buffer`]: crate::buffer::Buffer
|
||||
///
|
||||
/// The `Policy` must also implement `Clone`. This middleware will
|
||||
/// clone the policy for each _request session_. This means a new clone
|
||||
/// of the policy will be created for each initial request and any subsequent
|
||||
/// retries of that request. Therefore, any state stored in the `Policy` instance
|
||||
/// is for that request session only. In order to share data across request
|
||||
/// sessions, that shared state may be stored in an [`Arc`], so that all clones
|
||||
/// of the `Policy` type reference the same instance of the shared state.
|
||||
///
|
||||
/// [`Arc`]: std::sync::Arc
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Retry<P, S> {
|
||||
#[pin]
|
||||
policy: P,
|
||||
service: S,
|
||||
}
|
||||
|
@ -14,9 +14,9 @@ use std::future::Future;
|
||||
/// struct Attempts(usize);
|
||||
///
|
||||
/// impl<E> Policy<Req, Res, E> for Attempts {
|
||||
/// type Future = future::Ready<Self>;
|
||||
/// type Future = future::Ready<()>;
|
||||
///
|
||||
/// fn retry(&self, req: &mut Req, result: &mut Result<Res, E>) -> Option<Self::Future> {
|
||||
/// fn retry(&mut self, req: &mut Req, result: &mut Result<Res, E>) -> Option<Self::Future> {
|
||||
/// match result {
|
||||
/// Ok(_) => {
|
||||
/// // Treat all `Response`s as success,
|
||||
@ -28,7 +28,8 @@ use std::future::Future;
|
||||
/// // But we limit the number of attempts...
|
||||
/// if self.0 > 0 {
|
||||
/// // Try again!
|
||||
/// Some(future::ready(Attempts(self.0 - 1)))
|
||||
/// self.0 -= 1;
|
||||
/// Some(future::ready(()))
|
||||
/// } else {
|
||||
/// // Used all our attempts, no retry...
|
||||
/// None
|
||||
@ -37,14 +38,14 @@ use std::future::Future;
|
||||
/// }
|
||||
/// }
|
||||
///
|
||||
/// fn clone_request(&self, req: &Req) -> Option<Req> {
|
||||
/// fn clone_request(&mut self, req: &Req) -> Option<Req> {
|
||||
/// Some(req.clone())
|
||||
/// }
|
||||
/// }
|
||||
/// ```
|
||||
pub trait Policy<Req, Res, E>: Sized {
|
||||
pub trait Policy<Req, Res, E> {
|
||||
/// The [`Future`] type returned by [`Policy::retry`].
|
||||
type Future: Future<Output = Self>;
|
||||
type Future: Future<Output = ()>;
|
||||
|
||||
/// Check the policy if a certain request should be retried.
|
||||
///
|
||||
@ -53,8 +54,10 @@ pub trait Policy<Req, Res, E>: Sized {
|
||||
///
|
||||
/// If the request should **not** be retried, return `None`.
|
||||
///
|
||||
/// If the request *should* be retried, return `Some` future of a new
|
||||
/// policy that would apply for the next request attempt.
|
||||
/// If the request *should* be retried, return `Some` future that will delay
|
||||
/// the next retry of the request. This can be used to sleep for a certain
|
||||
/// duration, to wait for some external condition to be met before retrying,
|
||||
/// or resolve right away, if the request should be retried immediately.
|
||||
///
|
||||
/// ## Mutating Requests
|
||||
///
|
||||
@ -77,10 +80,14 @@ pub trait Policy<Req, Res, E>: Sized {
|
||||
///
|
||||
/// [`Service::Response`]: crate::Service::Response
|
||||
/// [`Service::Error`]: crate::Service::Error
|
||||
fn retry(&self, req: &mut Req, result: &mut Result<Res, E>) -> Option<Self::Future>;
|
||||
fn retry(&mut self, req: &mut Req, result: &mut Result<Res, E>) -> Option<Self::Future>;
|
||||
|
||||
/// Tries to clone a request before being passed to the inner service.
|
||||
///
|
||||
/// If the request cannot be cloned, return [`None`].
|
||||
fn clone_request(&self, req: &Req) -> Option<Req>;
|
||||
fn clone_request(&mut self, req: &Req) -> Option<Req>;
|
||||
}
|
||||
|
||||
// Ensure `Policy` is object safe
|
||||
#[cfg(test)]
|
||||
fn _obj_safe(_: Box<dyn Policy<(), (), (), Future = futures::future::Ready<()>>>) {}
|
||||
|
@ -43,13 +43,13 @@ where
|
||||
Req: Clone,
|
||||
E: Into<Box<dyn std::error::Error + Send + Sync + 'static>>,
|
||||
{
|
||||
type Future = Ready<Self>;
|
||||
type Future = Ready<()>;
|
||||
|
||||
fn retry(&self, _req: &mut Req, _result: &mut Result<Res, E>) -> Option<Self::Future> {
|
||||
fn retry(&mut self, _req: &mut Req, _result: &mut Result<Res, E>) -> Option<Self::Future> {
|
||||
None
|
||||
}
|
||||
|
||||
fn clone_request(&self, req: &Req) -> Option<Req> {
|
||||
fn clone_request(&mut self, req: &Req) -> Option<Req> {
|
||||
Some(req.clone())
|
||||
}
|
||||
}
|
||||
|
@ -122,16 +122,16 @@ type Handle = mock::Handle<Req, Res>;
|
||||
struct RetryErrors;
|
||||
|
||||
impl Policy<Req, Res, Error> for RetryErrors {
|
||||
type Future = future::Ready<Self>;
|
||||
fn retry(&self, _: &mut Req, result: &mut Result<Res, Error>) -> Option<Self::Future> {
|
||||
type Future = future::Ready<()>;
|
||||
fn retry(&mut self, _: &mut Req, result: &mut Result<Res, Error>) -> Option<Self::Future> {
|
||||
if result.is_err() {
|
||||
Some(future::ready(RetryErrors))
|
||||
Some(future::ready(()))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
fn clone_request(&self, req: &Req) -> Option<Req> {
|
||||
fn clone_request(&mut self, req: &Req) -> Option<Req> {
|
||||
Some(*req)
|
||||
}
|
||||
}
|
||||
@ -140,16 +140,17 @@ impl Policy<Req, Res, Error> for RetryErrors {
|
||||
struct Limit(usize);
|
||||
|
||||
impl Policy<Req, Res, Error> for Limit {
|
||||
type Future = future::Ready<Self>;
|
||||
fn retry(&self, _: &mut Req, result: &mut Result<Res, Error>) -> Option<Self::Future> {
|
||||
type Future = future::Ready<()>;
|
||||
fn retry(&mut self, _: &mut Req, result: &mut Result<Res, Error>) -> Option<Self::Future> {
|
||||
if result.is_err() && self.0 > 0 {
|
||||
Some(future::ready(Limit(self.0 - 1)))
|
||||
self.0 -= 1;
|
||||
Some(future::ready(()))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
fn clone_request(&self, req: &Req) -> Option<Req> {
|
||||
fn clone_request(&mut self, req: &Req) -> Option<Req> {
|
||||
Some(*req)
|
||||
}
|
||||
}
|
||||
@ -158,18 +159,18 @@ impl Policy<Req, Res, Error> for Limit {
|
||||
struct UnlessErr(InnerError);
|
||||
|
||||
impl Policy<Req, Res, Error> for UnlessErr {
|
||||
type Future = future::Ready<Self>;
|
||||
fn retry(&self, _: &mut Req, result: &mut Result<Res, Error>) -> Option<Self::Future> {
|
||||
type Future = future::Ready<()>;
|
||||
fn retry(&mut self, _: &mut Req, result: &mut Result<Res, Error>) -> Option<Self::Future> {
|
||||
result.as_ref().err().and_then(|err| {
|
||||
if err.to_string() != self.0 {
|
||||
Some(future::ready(self.clone()))
|
||||
Some(future::ready(()))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn clone_request(&self, req: &Req) -> Option<Req> {
|
||||
fn clone_request(&mut self, req: &Req) -> Option<Req> {
|
||||
Some(*req)
|
||||
}
|
||||
}
|
||||
@ -178,12 +179,12 @@ impl Policy<Req, Res, Error> for UnlessErr {
|
||||
struct CannotClone;
|
||||
|
||||
impl Policy<Req, Res, Error> for CannotClone {
|
||||
type Future = future::Ready<Self>;
|
||||
fn retry(&self, _: &mut Req, _: &mut Result<Res, Error>) -> Option<Self::Future> {
|
||||
type Future = future::Ready<()>;
|
||||
fn retry(&mut self, _: &mut Req, _: &mut Result<Res, Error>) -> Option<Self::Future> {
|
||||
unreachable!("retry cannot be called since request isn't cloned");
|
||||
}
|
||||
|
||||
fn clone_request(&self, _req: &Req) -> Option<Req> {
|
||||
fn clone_request(&mut self, _req: &Req) -> Option<Req> {
|
||||
None
|
||||
}
|
||||
}
|
||||
@ -199,21 +200,20 @@ impl Policy<Req, Res, Error> for MutatingPolicy
|
||||
where
|
||||
Error: From<&'static str>,
|
||||
{
|
||||
type Future = future::Ready<Self>;
|
||||
type Future = future::Ready<()>;
|
||||
|
||||
fn retry(&self, req: &mut Req, result: &mut Result<Res, Error>) -> Option<Self::Future> {
|
||||
fn retry(&mut self, req: &mut Req, result: &mut Result<Res, Error>) -> Option<Self::Future> {
|
||||
if self.remaining == 0 {
|
||||
*result = Err("out of retries".into());
|
||||
None
|
||||
} else {
|
||||
*req = "retrying";
|
||||
Some(future::ready(MutatingPolicy {
|
||||
remaining: self.remaining - 1,
|
||||
}))
|
||||
self.remaining -= 1;
|
||||
Some(future::ready(()))
|
||||
}
|
||||
}
|
||||
|
||||
fn clone_request(&self, req: &Req) -> Option<Req> {
|
||||
fn clone_request(&mut self, req: &Req) -> Option<Req> {
|
||||
Some(*req)
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user