mirror of
https://github.com/tower-rs/tower.git
synced 2026-04-22 15:06:59 +00:00
Update retry and prepare for release (#376)
* Update retry and prepare for release * fmt
This commit is contained in:
@@ -13,7 +13,7 @@ members = [
|
||||
# "tower-load-shed",
|
||||
# "tower-ready-cache",
|
||||
"tower-reconnect",
|
||||
# "tower-retry",
|
||||
"tower-retry",
|
||||
"tower-service",
|
||||
# "tower-spawn-ready",
|
||||
"tower-test",
|
||||
|
||||
@@ -1,3 +1,10 @@
|
||||
# 0.3.0 (December 4, 2019)
|
||||
|
||||
- Update to `tower-service 0.3`
|
||||
- Update to `tower-layer 0.3`
|
||||
- Update to `tokio 0.2`
|
||||
- Update to `futures-core 0.3`
|
||||
|
||||
# 0.3.0-alpha.2 (September 30, 2019)
|
||||
|
||||
- Move to `futures-*-preview 0.3.0-alpha.19`
|
||||
|
||||
@@ -8,13 +8,13 @@ name = "tower-retry"
|
||||
# - README.md
|
||||
# - Update CHANGELOG.md.
|
||||
# - Create "v0.1.x" git tag.
|
||||
version = "0.3.0-alpha.2"
|
||||
version = "0.3.0"
|
||||
authors = ["Tower Maintainers <team@tower-rs.com>"]
|
||||
license = "MIT"
|
||||
readme = "README.md"
|
||||
repository = "https://github.com/tower-rs/tower"
|
||||
homepage = "https://github.com/tower-rs/tower"
|
||||
documentation = "https://docs.rs/tower-retry/0.3.0-alpha.2"
|
||||
documentation = "https://docs.rs/tower-retry/0.3.0"
|
||||
description = """
|
||||
Retry failed requests.
|
||||
"""
|
||||
@@ -22,13 +22,14 @@ categories = ["asynchronous", "network-programming"]
|
||||
edition = "2018"
|
||||
|
||||
[dependencies]
|
||||
tower-service = { version = "=0.3.0-alpha.2", path = "../tower-service" }
|
||||
tower-layer = { version = "=0.3.0-alpha.2", path = "../tower-layer" }
|
||||
tokio-timer = "=0.3.0-alpha.6"
|
||||
tower-service = { version = "0.3", path = "../tower-service" }
|
||||
tower-layer = { version = "0.3", path = "../tower-layer" }
|
||||
tokio = { version = "0.2", features = ["time"] }
|
||||
pin-project = "0.4"
|
||||
futures-core-preview = "=0.3.0-alpha.19"
|
||||
futures-core = "0.3"
|
||||
|
||||
[dev-dependencies]
|
||||
tower-test = { version = "=0.3.0-alpha.2", path = "../tower-test" }
|
||||
tokio-test = "=0.2.0-alpha.6"
|
||||
futures-util-preview = "=0.3.0-alpha.19"
|
||||
tower-test = { version = "0.3", path = "../tower-test" }
|
||||
tokio = { version = "0.2", features = ["macros", "test-util"] }
|
||||
tokio-test = "0.2"
|
||||
futures-util = "0.3"
|
||||
|
||||
@@ -6,9 +6,9 @@ use std::{
|
||||
atomic::{AtomicIsize, Ordering},
|
||||
Mutex,
|
||||
},
|
||||
time::{Duration, Instant},
|
||||
time::Duration,
|
||||
};
|
||||
use tokio_timer::clock;
|
||||
use tokio::time::Instant;
|
||||
|
||||
/// Represents a "budget" for retrying requests.
|
||||
///
|
||||
@@ -102,7 +102,7 @@ impl Budget {
|
||||
bucket: Bucket {
|
||||
generation: Mutex::new(Generation {
|
||||
index: 0,
|
||||
time: clock::now(),
|
||||
time: Instant::now(),
|
||||
}),
|
||||
reserve,
|
||||
slots: slots.into_boxed_slice(),
|
||||
@@ -174,7 +174,7 @@ impl Bucket {
|
||||
fn expire(&self) {
|
||||
let mut gen = self.generation.lock().expect("generation lock");
|
||||
|
||||
let now = clock::now();
|
||||
let now = Instant::now();
|
||||
let diff = now - gen.time;
|
||||
if diff < self.window {
|
||||
// not expired yet
|
||||
@@ -214,7 +214,7 @@ impl Bucket {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use tokio_test::clock;
|
||||
use tokio::time;
|
||||
|
||||
#[test]
|
||||
fn empty() {
|
||||
@@ -222,54 +222,52 @@ mod tests {
|
||||
bgt.withdraw().unwrap_err();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn leaky() {
|
||||
clock::mock(|time| {
|
||||
let bgt = Budget::new(Duration::from_secs(1), 0, 1.0);
|
||||
bgt.deposit();
|
||||
#[tokio::test]
|
||||
async fn leaky() {
|
||||
time::pause();
|
||||
|
||||
time.advance(Duration::from_secs(3));
|
||||
let bgt = Budget::new(Duration::from_secs(1), 0, 1.0);
|
||||
bgt.deposit();
|
||||
|
||||
bgt.withdraw().unwrap_err();
|
||||
});
|
||||
time::advance(Duration::from_secs(3)).await;
|
||||
|
||||
bgt.withdraw().unwrap_err();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn slots() {
|
||||
clock::mock(|time| {
|
||||
let bgt = Budget::new(Duration::from_secs(1), 0, 0.5);
|
||||
bgt.deposit();
|
||||
bgt.deposit();
|
||||
time.advance(Duration::from_millis(900));
|
||||
// 900ms later, the deposit should still be valid
|
||||
bgt.withdraw().unwrap();
|
||||
#[tokio::test]
|
||||
async fn slots() {
|
||||
time::pause();
|
||||
|
||||
// blank slate
|
||||
time.advance(Duration::from_millis(2000));
|
||||
let bgt = Budget::new(Duration::from_secs(1), 0, 0.5);
|
||||
bgt.deposit();
|
||||
bgt.deposit();
|
||||
time::advance(Duration::from_millis(901)).await;
|
||||
// 900ms later, the deposit should still be valid
|
||||
bgt.withdraw().unwrap();
|
||||
|
||||
bgt.deposit();
|
||||
time.advance(Duration::from_millis(300));
|
||||
bgt.deposit();
|
||||
time.advance(Duration::from_millis(800));
|
||||
bgt.deposit();
|
||||
// blank slate
|
||||
time::advance(Duration::from_millis(2001)).await;
|
||||
|
||||
// the first deposit is expired, but the 2nd should still be valid,
|
||||
// combining with the 3rd
|
||||
bgt.withdraw().unwrap();
|
||||
});
|
||||
bgt.deposit();
|
||||
time::advance(Duration::from_millis(301)).await;
|
||||
bgt.deposit();
|
||||
time::advance(Duration::from_millis(801)).await;
|
||||
bgt.deposit();
|
||||
|
||||
// the first deposit is expired, but the 2nd should still be valid,
|
||||
// combining with the 3rd
|
||||
bgt.withdraw().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn reserve() {
|
||||
clock::mock(|_| {
|
||||
let bgt = Budget::new(Duration::from_secs(1), 5, 1.0);
|
||||
bgt.withdraw().unwrap();
|
||||
bgt.withdraw().unwrap();
|
||||
bgt.withdraw().unwrap();
|
||||
bgt.withdraw().unwrap();
|
||||
bgt.withdraw().unwrap();
|
||||
#[tokio::test]
|
||||
async fn reserve() {
|
||||
let bgt = Budget::new(Duration::from_secs(1), 5, 1.0);
|
||||
bgt.withdraw().unwrap();
|
||||
bgt.withdraw().unwrap();
|
||||
bgt.withdraw().unwrap();
|
||||
bgt.withdraw().unwrap();
|
||||
bgt.withdraw().unwrap();
|
||||
|
||||
bgt.withdraw().unwrap_err();
|
||||
});
|
||||
bgt.withdraw().unwrap_err();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
#![doc(html_root_url = "https://docs.rs/tower-retry/0.3.0-alpha.2")]
|
||||
#![doc(html_root_url = "https://docs.rs/tower-retry/0.3.0")]
|
||||
#![warn(
|
||||
missing_debug_implementations,
|
||||
missing_docs,
|
||||
|
||||
@@ -1,101 +1,79 @@
|
||||
use futures_util::{future, pin_mut};
|
||||
use std::future::Future;
|
||||
use futures_util::future;
|
||||
use tokio_test::{assert_pending, assert_ready_err, assert_ready_ok, task};
|
||||
use tower_retry::Policy;
|
||||
use tower_service::Service;
|
||||
use tower_test::{assert_request_eq, mock};
|
||||
|
||||
#[test]
|
||||
fn retry_errors() {
|
||||
task::mock(|cx| {
|
||||
let (mut service, handle) = new_service(RetryErrors);
|
||||
pin_mut!(handle);
|
||||
#[tokio::test]
|
||||
async fn retry_errors() {
|
||||
let (mut service, mut handle) = new_service(RetryErrors);
|
||||
|
||||
assert_ready_ok!(service.poll_ready(cx));
|
||||
assert_ready_ok!(service.poll_ready());
|
||||
|
||||
let fut = service.call("hello");
|
||||
pin_mut!(fut);
|
||||
let mut fut = task::spawn(service.call("hello"));
|
||||
|
||||
assert_request_eq!(handle, "hello").send_error("retry me");
|
||||
assert_request_eq!(handle, "hello").send_error("retry me");
|
||||
|
||||
assert_pending!(fut.as_mut().poll(cx));
|
||||
assert_pending!(fut.poll());
|
||||
|
||||
assert_request_eq!(handle, "hello").send_response("world");
|
||||
assert_request_eq!(handle, "hello").send_response("world");
|
||||
|
||||
assert_ready_ok!(fut.poll(cx), "world");
|
||||
});
|
||||
assert_eq!(fut.into_inner().await.unwrap(), "world");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn retry_limit() {
|
||||
task::mock(|cx| {
|
||||
let (mut service, handle) = new_service(Limit(2));
|
||||
pin_mut!(handle);
|
||||
#[tokio::test]
|
||||
async fn retry_limit() {
|
||||
let (mut service, mut handle) = new_service(Limit(2));
|
||||
|
||||
assert_ready_ok!(service.poll_ready(cx));
|
||||
assert_ready_ok!(service.poll_ready());
|
||||
|
||||
let fut = service.call("hello");
|
||||
pin_mut!(fut);
|
||||
let mut fut = task::spawn(service.call("hello"));
|
||||
|
||||
assert_request_eq!(handle, "hello").send_error("retry 1");
|
||||
assert_pending!(fut.as_mut().poll(cx));
|
||||
assert_request_eq!(handle, "hello").send_error("retry 1");
|
||||
assert_pending!(fut.poll());
|
||||
|
||||
assert_request_eq!(handle, "hello").send_error("retry 2");
|
||||
assert_pending!(fut.as_mut().poll(cx));
|
||||
assert_request_eq!(handle, "hello").send_error("retry 2");
|
||||
assert_pending!(fut.poll());
|
||||
|
||||
assert_request_eq!(handle, "hello").send_error("retry 3");
|
||||
assert_eq!(assert_ready_err!(fut.poll(cx)).to_string(), "retry 3");
|
||||
});
|
||||
assert_request_eq!(handle, "hello").send_error("retry 3");
|
||||
assert_eq!(assert_ready_err!(fut.poll()).to_string(), "retry 3");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn retry_error_inspection() {
|
||||
task::mock(|cx| {
|
||||
let (mut service, handle) = new_service(UnlessErr("reject"));
|
||||
pin_mut!(handle);
|
||||
#[tokio::test]
|
||||
async fn retry_error_inspection() {
|
||||
let (mut service, mut handle) = new_service(UnlessErr("reject"));
|
||||
|
||||
assert_ready_ok!(service.poll_ready(cx));
|
||||
let fut = service.call("hello");
|
||||
pin_mut!(fut);
|
||||
assert_ready_ok!(service.poll_ready());
|
||||
let mut fut = task::spawn(service.call("hello"));
|
||||
|
||||
assert_request_eq!(handle, "hello").send_error("retry 1");
|
||||
assert_pending!(fut.as_mut().poll(cx));
|
||||
assert_request_eq!(handle, "hello").send_error("retry 1");
|
||||
assert_pending!(fut.poll());
|
||||
|
||||
assert_request_eq!(handle, "hello").send_error("reject");
|
||||
assert_eq!(assert_ready_err!(fut.poll(cx)).to_string(), "reject");
|
||||
});
|
||||
assert_request_eq!(handle, "hello").send_error("reject");
|
||||
assert_eq!(assert_ready_err!(fut.poll()).to_string(), "reject");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn retry_cannot_clone_request() {
|
||||
task::mock(|cx| {
|
||||
let (mut service, handle) = new_service(CannotClone);
|
||||
pin_mut!(handle);
|
||||
#[tokio::test]
|
||||
async fn retry_cannot_clone_request() {
|
||||
let (mut service, mut handle) = new_service(CannotClone);
|
||||
|
||||
assert_ready_ok!(service.poll_ready(cx));
|
||||
let fut = service.call("hello");
|
||||
pin_mut!(fut);
|
||||
assert_ready_ok!(service.poll_ready());
|
||||
let mut fut = task::spawn(service.call("hello"));
|
||||
|
||||
assert_request_eq!(handle, "hello").send_error("retry 1");
|
||||
assert_eq!(assert_ready_err!(fut.poll(cx)).to_string(), "retry 1");
|
||||
});
|
||||
assert_request_eq!(handle, "hello").send_error("retry 1");
|
||||
assert_eq!(assert_ready_err!(fut.poll()).to_string(), "retry 1");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn success_with_cannot_clone() {
|
||||
task::mock(|cx| {
|
||||
// Even though the request couldn't be cloned, if the first request succeeds,
|
||||
// it should succeed overall.
|
||||
let (mut service, handle) = new_service(CannotClone);
|
||||
pin_mut!(handle);
|
||||
#[tokio::test]
|
||||
async fn success_with_cannot_clone() {
|
||||
// Even though the request couldn't be cloned, if the first request succeeds,
|
||||
// it should succeed overall.
|
||||
let (mut service, mut handle) = new_service(CannotClone);
|
||||
|
||||
assert_ready_ok!(service.poll_ready(cx));
|
||||
let fut = service.call("hello");
|
||||
pin_mut!(fut);
|
||||
assert_ready_ok!(service.poll_ready());
|
||||
let mut fut = task::spawn(service.call("hello"));
|
||||
|
||||
assert_request_eq!(handle, "hello").send_response("world");
|
||||
assert_ready_ok!(fut.poll(cx), "world");
|
||||
});
|
||||
assert_request_eq!(handle, "hello").send_response("world");
|
||||
assert_ready_ok!(fut.poll(), "world");
|
||||
}
|
||||
|
||||
type Req = &'static str;
|
||||
@@ -177,8 +155,7 @@ impl Policy<Req, Res, Error> for CannotClone {
|
||||
|
||||
fn new_service<P: Policy<Req, Res, Error> + Clone>(
|
||||
policy: P,
|
||||
) -> (tower_retry::Retry<P, Mock>, Handle) {
|
||||
let (service, handle) = mock::pair();
|
||||
let service = tower_retry::Retry::new(policy, service);
|
||||
(service, handle)
|
||||
) -> (mock::Spawn<tower_retry::Retry<P, Mock>>, Handle) {
|
||||
let retry = tower_retry::RetryLayer::new(policy);
|
||||
mock::spawn_layer(retry)
|
||||
}
|
||||
|
||||
@@ -4,7 +4,9 @@ pub mod error;
|
||||
pub mod future;
|
||||
pub mod spawn;
|
||||
|
||||
use crate::mock::{error::Error, future::ResponseFuture, spawn::Spawn};
|
||||
pub use spawn::Spawn;
|
||||
|
||||
use crate::mock::{error::Error, future::ResponseFuture};
|
||||
use core::task::Waker;
|
||||
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
|
||||
Reference in New Issue
Block a user