mirror of
https://github.com/tower-rs/tower.git
synced 2025-09-27 13:00:43 +00:00
Compare commits
27 Commits
tower-serv
...
master
Author | SHA1 | Date | |
---|---|---|---|
![]() |
a1c277bc90 | ||
![]() |
cd2bfa0f58 | ||
![]() |
9e901d450d | ||
![]() |
d21cdbf044 | ||
![]() |
fcef5928a2 | ||
![]() |
fe3156587c | ||
![]() |
50d839d3b0 | ||
![]() |
9754acb5dc | ||
![]() |
b79c0c7497 | ||
![]() |
6a3ab07b4c | ||
![]() |
ec81e5797b | ||
![]() |
81658e65ad | ||
![]() |
abb375d08c | ||
![]() |
6c8d98b470 | ||
![]() |
fb646693bf | ||
![]() |
ee149f0170 | ||
![]() |
aade4e34ff | ||
![]() |
954e4c7e8d | ||
![]() |
34a6951a46 | ||
![]() |
7dc533ef86 | ||
![]() |
a09fd9742d | ||
![]() |
f57e31b0e6 | ||
![]() |
da24532017 | ||
![]() |
6283f3aff1 | ||
![]() |
71551010ac | ||
![]() |
b2c48b46a3 | ||
![]() |
fec9e559e2 |
8
.github/workflows/CI.yml
vendored
8
.github/workflows/CI.yml
vendored
@ -41,7 +41,9 @@ jobs:
|
||||
- name: "install Rust nightly"
|
||||
uses: dtolnay/rust-toolchain@nightly
|
||||
- name: Select minimal versions
|
||||
run: cargo update -Z minimal-versions
|
||||
run: |
|
||||
cargo update -Z minimal-versions
|
||||
cargo update -p lazy_static --precise 1.5.0
|
||||
- name: Check
|
||||
run: |
|
||||
rustup default ${{ env.MSRV }}
|
||||
@ -90,7 +92,9 @@ jobs:
|
||||
- name: "install Rust nightly"
|
||||
uses: dtolnay/rust-toolchain@nightly
|
||||
- name: Select minimal versions
|
||||
run: cargo update -Z minimal-versions
|
||||
run: |
|
||||
cargo update -Z minimal-versions
|
||||
cargo update -p lazy_static --precise 1.5.0
|
||||
- name: test
|
||||
run: |
|
||||
rustup default ${{ env.MSRV }}
|
||||
|
11
Cargo.toml
11
Cargo.toml
@ -12,16 +12,15 @@ futures = "0.3.22"
|
||||
futures-core = "0.3.22"
|
||||
futures-util = { version = "0.3.22", default-features = false }
|
||||
hdrhistogram = { version = "7.0", default-features = false }
|
||||
http = "0.2"
|
||||
http = "1"
|
||||
indexmap = "2.0.2"
|
||||
lazy_static = "1.4.0"
|
||||
pin-project-lite = "0.2.7"
|
||||
quickcheck = "1"
|
||||
rand = "0.8"
|
||||
slab = "0.4"
|
||||
sync_wrapper = "0.1.1"
|
||||
rand = "0.9"
|
||||
slab = "0.4.9"
|
||||
sync_wrapper = "1"
|
||||
tokio = "1.6.2"
|
||||
tokio-stream = "0.1.0"
|
||||
tokio-stream = "0.1.1"
|
||||
tokio-test = "0.4"
|
||||
tokio-util = { version = "0.7.0", default-features = false }
|
||||
tracing = { version = "0.1.2", default-features = false }
|
||||
|
@ -35,6 +35,10 @@ Tower will keep a rolling MSRV (minimum supported Rust version) policy of **at
|
||||
least** 6 months. When increasing the MSRV, the new Rust version must have been
|
||||
released at least six months ago. The current MSRV is 1.64.0.
|
||||
|
||||
## `no_std`
|
||||
|
||||
`tower` itself is _not_ `no_std` compatible, but `tower-layer` and `tower-service` are.
|
||||
|
||||
## Getting Started
|
||||
|
||||
If you're brand new to Tower and want to start with the basics we recommend you
|
||||
|
@ -10,7 +10,7 @@ edition = "2018"
|
||||
tower = { version = "0.4", path = "../tower", features = ["full"] }
|
||||
tower-service = "0.3"
|
||||
tokio = { version = "1.0", features = ["full"] }
|
||||
rand = "0.8"
|
||||
rand = "0.9"
|
||||
pin-project = "1.0"
|
||||
futures = "0.3.22"
|
||||
tracing = "0.1"
|
||||
|
@ -22,5 +22,5 @@ edition = "2018"
|
||||
[dependencies]
|
||||
|
||||
[dev-dependencies]
|
||||
tower-service = { version = "0.3.0", path = "../tower-service" }
|
||||
tower = { version = "0.5.0", path = "../tower" }
|
||||
tower-service = { path = "../tower-service" }
|
||||
tower = { path = "../tower" }
|
||||
|
@ -30,6 +30,8 @@ reusable components that can be applied to very different kinds of services;
|
||||
for example, it can be applied to services operating on different protocols,
|
||||
and to both the client and server side of a network transaction.
|
||||
|
||||
`tower-layer` is `no_std` compatible.
|
||||
|
||||
## License
|
||||
|
||||
This project is licensed under the [MIT license](LICENSE).
|
||||
@ -40,4 +42,4 @@ Unless you explicitly state otherwise, any contribution intentionally submitted
|
||||
for inclusion in Tower by you, shall be licensed as MIT, without any additional
|
||||
terms or conditions.
|
||||
|
||||
[Tower]: https://crates.io/crates/tower
|
||||
[Tower]: https://crates.io/crates/tower
|
||||
|
@ -1,5 +1,5 @@
|
||||
use super::Layer;
|
||||
use std::fmt;
|
||||
use core::fmt;
|
||||
|
||||
/// A no-op middleware.
|
||||
///
|
||||
@ -7,21 +7,35 @@ use std::fmt;
|
||||
/// service without modifying it.
|
||||
///
|
||||
/// [`Service`]: https://docs.rs/tower-service/latest/tower_service/trait.Service.html
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```rust
|
||||
/// use tower_layer::Identity;
|
||||
/// use tower_layer::Layer;
|
||||
///
|
||||
/// let identity = Identity::new();
|
||||
///
|
||||
/// assert_eq!(identity.layer(42), 42);
|
||||
/// ```
|
||||
#[derive(Default, Clone)]
|
||||
pub struct Identity {
|
||||
_p: (),
|
||||
}
|
||||
|
||||
impl Identity {
|
||||
/// Create a new [`Identity`] value
|
||||
/// Creates a new [`Identity`].
|
||||
///
|
||||
/// ```rust
|
||||
/// use tower_layer::Identity;
|
||||
///
|
||||
/// let identity = Identity::new();
|
||||
/// ```
|
||||
pub const fn new() -> Identity {
|
||||
Identity { _p: () }
|
||||
}
|
||||
}
|
||||
|
||||
/// Decorates a [`Service`], transforming either the request or the response.
|
||||
///
|
||||
/// [`Service`]: https://docs.rs/tower-service/latest/tower_service/trait.Service.html
|
||||
impl<S> Layer<S> for Identity {
|
||||
type Service = S;
|
||||
|
||||
|
@ -1,22 +1,26 @@
|
||||
use super::Layer;
|
||||
use std::fmt;
|
||||
use core::fmt;
|
||||
|
||||
/// Returns a new [`LayerFn`] that implements [`Layer`] by calling the
|
||||
/// given function.
|
||||
///
|
||||
/// The [`Layer::layer`] method takes a type implementing [`Service`] and
|
||||
/// The [`Layer::layer()`] method takes a type implementing [`Service`] and
|
||||
/// returns a different type implementing [`Service`]. In many cases, this can
|
||||
/// be implemented by a function or a closure. The [`LayerFn`] helper allows
|
||||
/// writing simple [`Layer`] implementations without needing the boilerplate of
|
||||
/// a new struct implementing [`Layer`].
|
||||
///
|
||||
/// # Example
|
||||
/// [`Service`]: https://docs.rs/tower-service/latest/tower_service/trait.Service.html
|
||||
/// [`Layer::layer()`]: crate::Layer::layer
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```rust
|
||||
/// # use tower::Service;
|
||||
/// # use std::task::{Poll, Context};
|
||||
/// # use core::task::{Poll, Context};
|
||||
/// # use tower_layer::{Layer, layer_fn};
|
||||
/// # use std::fmt;
|
||||
/// # use std::convert::Infallible;
|
||||
/// # use core::fmt;
|
||||
/// # use core::convert::Infallible;
|
||||
/// #
|
||||
/// // A middleware that logs requests before forwarding them to another service
|
||||
/// pub struct LogService<S> {
|
||||
@ -61,9 +65,6 @@ use std::fmt;
|
||||
/// // Wrap our service in a `LogService` so requests are logged.
|
||||
/// let wrapped_service = log_layer.layer(uppercase_service);
|
||||
/// ```
|
||||
///
|
||||
/// [`Service`]: https://docs.rs/tower-service/latest/tower_service/trait.Service.html
|
||||
/// [`Layer::layer`]: crate::Layer::layer
|
||||
pub fn layer_fn<T>(f: T) -> LayerFn<T> {
|
||||
LayerFn { f }
|
||||
}
|
||||
@ -88,7 +89,7 @@ where
|
||||
impl<F> fmt::Debug for LayerFn<F> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("LayerFn")
|
||||
.field("f", &format_args!("{}", std::any::type_name::<F>()))
|
||||
.field("f", &format_args!("{}", core::any::type_name::<F>()))
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
@ -96,6 +97,7 @@ impl<F> fmt::Debug for LayerFn<F> {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use alloc::{format, string::ToString};
|
||||
|
||||
#[allow(dead_code)]
|
||||
#[test]
|
||||
@ -108,7 +110,7 @@ mod tests {
|
||||
|
||||
assert_eq!(
|
||||
"LayerFn { f: tower_layer::layer_fn::tests::layer_fn_has_useful_debug_impl::{{closure}} }".to_string(),
|
||||
format!("{:?}", layer),
|
||||
format!("{layer:?}"),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -16,6 +16,11 @@
|
||||
//!
|
||||
//! [`Service`]: https://docs.rs/tower/*/tower/trait.Service.html
|
||||
|
||||
#![no_std]
|
||||
|
||||
#[cfg(test)]
|
||||
extern crate alloc;
|
||||
|
||||
mod identity;
|
||||
mod layer_fn;
|
||||
mod stack;
|
||||
@ -41,9 +46,9 @@ pub use self::{
|
||||
///
|
||||
/// ```rust
|
||||
/// # use tower_service::Service;
|
||||
/// # use std::task::{Poll, Context};
|
||||
/// # use core::task::{Poll, Context};
|
||||
/// # use tower_layer::Layer;
|
||||
/// # use std::fmt;
|
||||
/// # use core::fmt;
|
||||
///
|
||||
/// pub struct LogLayer {
|
||||
/// target: &'static str,
|
||||
@ -100,7 +105,7 @@ pub trait Layer<S> {
|
||||
fn layer(&self, inner: S) -> Self::Service;
|
||||
}
|
||||
|
||||
impl<'a, T, S> Layer<S> for &'a T
|
||||
impl<T, S> Layer<S> for &T
|
||||
where
|
||||
T: ?Sized + Layer<S>,
|
||||
{
|
||||
|
@ -1,7 +1,22 @@
|
||||
use super::Layer;
|
||||
use std::fmt;
|
||||
use core::fmt;
|
||||
|
||||
/// Two middlewares chained together.
|
||||
/// Two [`Layer`]s chained together.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```rust
|
||||
/// use tower_layer::{Stack, layer_fn, Layer};
|
||||
///
|
||||
/// let inner = layer_fn(|service| service+2);
|
||||
/// let outer = layer_fn(|service| service*2);
|
||||
///
|
||||
/// let inner_outer_stack = Stack::new(inner, outer);
|
||||
///
|
||||
/// // (4 + 2) * 2 = 12
|
||||
/// // (4 * 2) + 2 = 10
|
||||
/// assert_eq!(inner_outer_stack.layer(4), 12);
|
||||
/// ```
|
||||
#[derive(Clone)]
|
||||
pub struct Stack<Inner, Outer> {
|
||||
inner: Inner,
|
||||
@ -9,7 +24,15 @@ pub struct Stack<Inner, Outer> {
|
||||
}
|
||||
|
||||
impl<Inner, Outer> Stack<Inner, Outer> {
|
||||
/// Create a new `Stack`.
|
||||
/// Creates a new [`Stack`].
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```rust
|
||||
/// use tower_layer::{Stack, Identity};
|
||||
///
|
||||
/// let stack = Stack::new(Identity::new(), Identity::new());
|
||||
/// ```
|
||||
pub const fn new(inner: Inner, outer: Outer) -> Self {
|
||||
Stack { inner, outer }
|
||||
}
|
||||
|
@ -45,6 +45,11 @@ middleware may take actions such as modify the request.
|
||||
|
||||
[`Service`]: https://docs.rs/tower-service/latest/tower_service/trait.Service.html
|
||||
[Tower]: https://crates.io/crates/tower
|
||||
|
||||
## `no_std`
|
||||
|
||||
`tower-service` is `no_std` compatible, but it (currently) requires the `alloc` crate.
|
||||
|
||||
## License
|
||||
|
||||
This project is licensed under the [MIT license](LICENSE).
|
||||
|
@ -13,8 +13,16 @@
|
||||
//! request / response clients and servers. It is simple but powerful and is
|
||||
//! used as the foundation for the rest of Tower.
|
||||
|
||||
use std::future::Future;
|
||||
use std::task::{Context, Poll};
|
||||
#![no_std]
|
||||
|
||||
extern crate alloc;
|
||||
|
||||
use alloc::boxed::Box;
|
||||
|
||||
use core::future::Future;
|
||||
use core::marker::Sized;
|
||||
use core::result::Result;
|
||||
use core::task::{Context, Poll};
|
||||
|
||||
/// An asynchronous function from a `Request` to a `Response`.
|
||||
///
|
||||
@ -91,13 +99,14 @@ use std::task::{Context, Poll};
|
||||
/// As an example, here is how a Redis request would be issued:
|
||||
///
|
||||
/// ```rust,ignore
|
||||
/// let client = redis::Client::new()
|
||||
/// let mut client = redis::Client::new()
|
||||
/// .connect("127.0.0.1:6379".parse().unwrap())
|
||||
/// .unwrap();
|
||||
///
|
||||
/// ServiceExt::<Cmd>::ready(&mut client).await?;
|
||||
///
|
||||
/// let resp = client.call(Cmd::set("foo", "this is the value of foo")).await?;
|
||||
///
|
||||
/// // Wait for the future to resolve
|
||||
/// println!("Redis response: {:?}", resp);
|
||||
/// ```
|
||||
///
|
||||
@ -273,7 +282,7 @@ use std::task::{Context, Poll};
|
||||
/// }
|
||||
/// ```
|
||||
///
|
||||
/// You should instead use [`std::mem::replace`] to take the service that was ready:
|
||||
/// You should instead use [`core::mem::replace`] to take the service that was ready:
|
||||
///
|
||||
/// ```rust
|
||||
/// # use std::pin::Pin;
|
||||
@ -355,9 +364,9 @@ pub trait Service<Request> {
|
||||
fn call(&mut self, req: Request) -> Self::Future;
|
||||
}
|
||||
|
||||
impl<'a, S, Request> Service<Request> for &'a mut S
|
||||
impl<S, Request> Service<Request> for &mut S
|
||||
where
|
||||
S: Service<Request> + 'a,
|
||||
S: Service<Request> + ?Sized,
|
||||
{
|
||||
type Response = S::Response;
|
||||
type Error = S::Error;
|
||||
|
@ -20,7 +20,6 @@ categories = ["asynchronous", "network-programming"]
|
||||
edition = "2018"
|
||||
|
||||
[dependencies]
|
||||
futures-util = { workspace = true }
|
||||
tokio = { workspace = true, features = ["sync"] }
|
||||
tokio-test = { workspace = true }
|
||||
tower-layer = { version = "0.3", path = "../tower-layer" }
|
||||
|
@ -1,14 +1,13 @@
|
||||
//! Future types
|
||||
|
||||
use crate::mock::error::{self, Error};
|
||||
use futures_util::ready;
|
||||
use pin_project_lite::pin_project;
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
use std::{
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
task::{ready, Context, Poll},
|
||||
};
|
||||
|
||||
pin_project! {
|
||||
|
@ -18,7 +18,6 @@ use std::{
|
||||
future::Future,
|
||||
sync::{Arc, Mutex},
|
||||
task::{Context, Poll},
|
||||
u64,
|
||||
};
|
||||
|
||||
/// Spawn a layer onto a mock service.
|
||||
|
@ -5,6 +5,24 @@ All notable changes to this project will be documented in this file.
|
||||
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
||||
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||
|
||||
# 0.5.2
|
||||
|
||||
### Added
|
||||
|
||||
- **util**: Add `BoxCloneSyncService` which is a `Clone + Send + Sync` boxed `Service` ([#777])
|
||||
- **util**: Add `BoxCloneSyncServiceLayer` which is a `Clone + Send + Sync` boxed `Layer` ([802])
|
||||
|
||||
[#777]: https://github.com/tower-rs/tower/pull/777
|
||||
[#802]: https://github.com/tower-rs/tower/pull/802
|
||||
|
||||
# 0.5.1
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fix minimum version of `tower-layer` dependency ([#787])
|
||||
|
||||
[#787]: https://github.com/tower-rs/tower/pull/787
|
||||
|
||||
# 0.5.0
|
||||
|
||||
### Fixed
|
||||
@ -19,7 +37,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
increases the flexibility of the retry policy. To update, update your method signature to include `mut` for both parameters. ([#584])
|
||||
- **retry**: **Breaking Change** Change Policy to accept &mut self ([#681])
|
||||
- **retry**: Add generic backoff utilities ([#685])
|
||||
- **retry**: Add Budget trait. This allows end-users to implement their own budget and bucket implementations. ([#703])
|
||||
- **retry**: **Breaking Change** `Budget` is now a trait. This allows end-users to implement their own budget and bucket implementations. ([#703])
|
||||
- **reconnect**: **Breaking Change** Remove unused generic parameter from `Reconnect::new` ([#755])
|
||||
- **ready-cache**: Allow iteration over ready services ([#700])
|
||||
- **discover**: Implement `Clone` for Change ([#701])
|
||||
@ -28,6 +46,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
- **filter**: Derive `Clone` for `AsyncFilterLayer` ([#731])
|
||||
- **general**: Update IndexMap ([#741])
|
||||
- **MSRV**: Increase MSRV to 1.63.0 ([#741])
|
||||
- **util**: **Breaking Change** `Either::A` and `Either::B` have been renamed `Either::Left` and `Either::Right`, respectively. ([#637])
|
||||
- **util**: **Breaking Change** `Either` now requires its two services to have the same error type. ([#637])
|
||||
- **util**: **Breaking Change** `Either` no longer implemenmts `Future`. ([#637])
|
||||
- **buffer**: **Breaking Change** `Buffer<S, Request>` is now generic over `Buffer<Request, S::Future>.` ([#654])
|
||||
- **buffer**: **Breaking Change** `Buffer`'s capacity now correctly matches the specified size. Previously, the
|
||||
capacity was subtly off-by-one, because a slot was held even while the worker task was processing a message. ([#635])
|
||||
|
||||
[#702]: https://github.com/tower-rs/tower/pull/702
|
||||
[#652]: https://github.com/tower-rs/tower/pull/652
|
||||
@ -42,6 +66,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
[#716]: https://github.com/tower-rs/tower/pull/716
|
||||
[#731]: https://github.com/tower-rs/tower/pull/731
|
||||
[#741]: https://github.com/tower-rs/tower/pull/741
|
||||
[#637]: https://github.com/tower-rs/tower/pull/637
|
||||
[#654]: https://github.com/tower-rs/tower/pull/654
|
||||
[#635]: https://github.com/tower-rs/tower/pull/635
|
||||
|
||||
# 0.4.12 (February 16, 2022)
|
||||
|
||||
|
@ -1,18 +1,15 @@
|
||||
[package]
|
||||
name = "tower"
|
||||
# When releasing to crates.io:
|
||||
# - Update doc url
|
||||
# - Cargo.toml
|
||||
# - README.md
|
||||
# - Update README.md
|
||||
# - Update CHANGELOG.md.
|
||||
# - Create "vX.X.X" git tag.
|
||||
version = "0.5.0"
|
||||
version = "0.5.2"
|
||||
authors = ["Tower Maintainers <team@tower-rs.com>"]
|
||||
license = "MIT"
|
||||
readme = "README.md"
|
||||
repository = "https://github.com/tower-rs/tower"
|
||||
homepage = "https://github.com/tower-rs/tower"
|
||||
documentation = "https://docs.rs/tower/0.5.0"
|
||||
description = """
|
||||
Tower is a library of modular and reusable components for building robust
|
||||
clients and servers.
|
||||
@ -23,10 +20,6 @@ edition = "2018"
|
||||
rust-version = "1.64.0"
|
||||
|
||||
[features]
|
||||
|
||||
# Internal
|
||||
__common = ["futures-core", "pin-project-lite"]
|
||||
|
||||
full = [
|
||||
"balance",
|
||||
"buffer",
|
||||
@ -48,33 +41,33 @@ full = [
|
||||
# FIXME: Use weak dependency once available (https://github.com/rust-lang/cargo/issues/8832)
|
||||
log = ["tracing/log"]
|
||||
balance = ["discover", "load", "ready-cache", "make", "slab", "util"]
|
||||
buffer = ["__common", "tokio/sync", "tokio/rt", "tokio-util", "tracing"]
|
||||
discover = ["__common"]
|
||||
filter = ["__common", "futures-util"]
|
||||
buffer = ["tokio/sync", "tokio/rt", "tokio-util", "tracing", "pin-project-lite"]
|
||||
discover = ["futures-core", "pin-project-lite"]
|
||||
filter = ["futures-util", "pin-project-lite"]
|
||||
hedge = ["util", "filter", "futures-util", "hdrhistogram", "tokio/time", "tracing"]
|
||||
limit = ["__common", "tokio/time", "tokio/sync", "tokio-util", "tracing"]
|
||||
load = ["__common", "tokio/time", "tracing"]
|
||||
load-shed = ["__common"]
|
||||
make = ["futures-util", "pin-project-lite", "tokio/io-std"]
|
||||
limit = ["tokio/time", "tokio/sync", "tokio-util", "tracing", "pin-project-lite"]
|
||||
load = ["tokio/time", "tracing", "pin-project-lite"]
|
||||
load-shed = ["pin-project-lite"]
|
||||
make = ["pin-project-lite", "tokio"]
|
||||
ready-cache = ["futures-core", "futures-util", "indexmap", "tokio/sync", "tracing", "pin-project-lite"]
|
||||
reconnect = ["make", "tokio/io-std", "tracing"]
|
||||
retry = ["__common", "tokio/time", "util"]
|
||||
spawn-ready = ["__common", "futures-util", "tokio/sync", "tokio/rt", "util", "tracing"]
|
||||
reconnect = ["make", "tracing"]
|
||||
retry = ["tokio/time", "util"]
|
||||
spawn-ready = ["futures-util", "tokio/sync", "tokio/rt", "util", "tracing"]
|
||||
steer = []
|
||||
timeout = ["pin-project-lite", "tokio/time"]
|
||||
util = ["__common", "futures-util", "pin-project-lite", "sync_wrapper"]
|
||||
util = ["futures-core", "futures-util", "pin-project-lite", "sync_wrapper"]
|
||||
tokio-stream = [] # TODO: Remove this feature at the next breaking release.
|
||||
|
||||
[dependencies]
|
||||
tower-layer = { version = "0.3.1", path = "../tower-layer" }
|
||||
tower-service = { version = "0.3.1", path = "../tower-service" }
|
||||
tower-layer = { version = "0.3.3", path = "../tower-layer" }
|
||||
tower-service = { version = "0.3.3", path = "../tower-service" }
|
||||
|
||||
futures-core = { workspace = true, optional = true }
|
||||
futures-util = { workspace = true, features = ["alloc"], optional = true }
|
||||
hdrhistogram = { workspace = true, optional = true }
|
||||
indexmap = { workspace = true, optional = true }
|
||||
slab = { workspace = true, optional = true }
|
||||
tokio = { workspace = true, features = ["sync"], optional = true }
|
||||
tokio-stream = { workspace = true, optional = true }
|
||||
tokio = { workspace = true, optional = true }
|
||||
tokio-util = { workspace = true, optional = true }
|
||||
tracing = { workspace = true, features = ["std"], optional = true }
|
||||
pin-project-lite = { workspace = true, optional = true }
|
||||
@ -83,7 +76,6 @@ sync_wrapper = { workspace = true, optional = true }
|
||||
[dev-dependencies]
|
||||
futures = { workspace = true }
|
||||
hdrhistogram = { workspace = true }
|
||||
pin-project-lite = { workspace = true }
|
||||
tokio = { workspace = true, features = ["macros", "sync", "test-util", "rt-multi-thread"] }
|
||||
tokio-stream = { workspace = true }
|
||||
tokio-test = { workspace = true }
|
||||
@ -91,7 +83,6 @@ tower-test = { version = "0.4", path = "../tower-test" }
|
||||
tracing = { workspace = true, features = ["std"] }
|
||||
tracing-subscriber = { workspace = true, features = ["fmt", "ansi"] }
|
||||
http = { workspace = true }
|
||||
lazy_static = { workspace = true }
|
||||
rand = { workspace = true, features = ["small_rng"] }
|
||||
quickcheck = { workspace = true }
|
||||
|
||||
|
@ -141,14 +141,14 @@ To get started using all of Tower's optional middleware, add this to your
|
||||
`Cargo.toml`:
|
||||
|
||||
```toml
|
||||
tower = { version = "0.5.0", features = ["full"] }
|
||||
tower = { version = "0.5.1", features = ["full"] }
|
||||
```
|
||||
|
||||
Alternatively, you can only enable some features. For example, to enable
|
||||
only the [`retry`] and [`timeout`][timeouts] middleware, write:
|
||||
|
||||
```toml
|
||||
tower = { version = "0.5.0", features = ["retry", "timeout"] }
|
||||
tower = { version = "0.5.1", features = ["retry", "timeout"] }
|
||||
```
|
||||
|
||||
See [here][all_layers] for a complete list of all middleware provided by
|
||||
|
@ -4,7 +4,6 @@ use futures_core::{Stream, TryStream};
|
||||
use futures_util::{stream, stream::StreamExt, stream::TryStreamExt};
|
||||
use hdrhistogram::Histogram;
|
||||
use pin_project_lite::pin_project;
|
||||
use rand::{self, Rng};
|
||||
use std::hash::Hash;
|
||||
use std::time::Duration;
|
||||
use std::{
|
||||
@ -46,13 +45,13 @@ struct Summary {
|
||||
async fn main() {
|
||||
tracing::subscriber::set_global_default(tracing_subscriber::FmtSubscriber::default()).unwrap();
|
||||
|
||||
println!("REQUESTS={}", REQUESTS);
|
||||
println!("CONCURRENCY={}", CONCURRENCY);
|
||||
println!("ENDPOINT_CAPACITY={}", ENDPOINT_CAPACITY);
|
||||
println!("REQUESTS={REQUESTS}");
|
||||
println!("CONCURRENCY={CONCURRENCY}");
|
||||
println!("ENDPOINT_CAPACITY={ENDPOINT_CAPACITY}");
|
||||
print!("MAX_ENDPOINT_LATENCIES=[");
|
||||
for max in &MAX_ENDPOINT_LATENCIES {
|
||||
let l = max.as_secs() * 1_000 + u64::from(max.subsec_millis());
|
||||
print!("{}ms, ", l);
|
||||
print!("{l}ms, ");
|
||||
}
|
||||
println!("]");
|
||||
|
||||
@ -124,7 +123,7 @@ fn gen_disco() -> impl Discover<
|
||||
|
||||
let maxms = u64::from(latency.subsec_millis())
|
||||
.saturating_add(latency.as_secs().saturating_mul(1_000));
|
||||
let latency = Duration::from_millis(rand::thread_rng().gen_range(0..maxms));
|
||||
let latency = Duration::from_millis(rand::random_range(0..maxms));
|
||||
|
||||
async move {
|
||||
time::sleep_until(start + latency).await;
|
||||
@ -149,7 +148,7 @@ where
|
||||
<D::Service as Service<Req>>::Future: Send,
|
||||
<D::Service as load::Load>::Metric: std::fmt::Debug,
|
||||
{
|
||||
println!("{}", name);
|
||||
println!("{name}");
|
||||
|
||||
let requests = stream::repeat(Req).take(REQUESTS);
|
||||
let service = ConcurrencyLimit::new(lb, CONCURRENCY);
|
||||
@ -193,7 +192,7 @@ impl Summary {
|
||||
}
|
||||
for (i, c) in self.count_by_instance.iter().enumerate() {
|
||||
let p = *c as f64 / total as f64 * 100.0;
|
||||
println!(" [{:02}] {:>5.01}%", i, p);
|
||||
println!(" [{i:02}] {p:>5.01}%");
|
||||
}
|
||||
|
||||
println!(" wall {:4}s", self.start.elapsed().as_secs());
|
||||
|
@ -1,6 +1,5 @@
|
||||
use super::Balance;
|
||||
use crate::discover::Discover;
|
||||
use futures_core::ready;
|
||||
use pin_project_lite::pin_project;
|
||||
use std::hash::Hash;
|
||||
use std::marker::PhantomData;
|
||||
@ -8,7 +7,7 @@ use std::{
|
||||
fmt,
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
task::{ready, Context, Poll},
|
||||
};
|
||||
use tower_service::Service;
|
||||
|
||||
|
@ -3,14 +3,13 @@ use crate::discover::{Change, Discover};
|
||||
use crate::load::Load;
|
||||
use crate::ready_cache::{error::Failed, ReadyCache};
|
||||
use crate::util::rng::{sample_floyd2, HasherRng, Rng};
|
||||
use futures_core::ready;
|
||||
use futures_util::future::{self, TryFutureExt};
|
||||
use std::hash::Hash;
|
||||
use std::marker::PhantomData;
|
||||
use std::{
|
||||
fmt,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
task::{ready, Context, Poll},
|
||||
};
|
||||
use tower_service::Service;
|
||||
use tracing::{debug, trace};
|
||||
|
@ -3,12 +3,11 @@
|
||||
//! [`Buffer`]: crate::buffer::Buffer
|
||||
|
||||
use super::{error::Closed, message};
|
||||
use futures_core::ready;
|
||||
use pin_project_lite::pin_project;
|
||||
use std::{
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
task::{ready, Context, Poll},
|
||||
};
|
||||
|
||||
pin_project! {
|
||||
|
@ -56,7 +56,7 @@ where
|
||||
}
|
||||
|
||||
impl<Request> fmt::Debug for BufferLayer<Request> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("BufferLayer")
|
||||
.field("bound", &self.bound)
|
||||
.finish()
|
||||
@ -65,10 +65,7 @@ impl<Request> fmt::Debug for BufferLayer<Request> {
|
||||
|
||||
impl<Request> Clone for BufferLayer<Request> {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
bound: self.bound,
|
||||
_p: PhantomData,
|
||||
}
|
||||
*self
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2,12 +2,11 @@ use super::{
|
||||
error::{Closed, ServiceError},
|
||||
message::Message,
|
||||
};
|
||||
use futures_core::ready;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::{
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
task::{ready, Context, Poll},
|
||||
};
|
||||
use tokio::sync::mpsc;
|
||||
use tower_service::Service;
|
||||
|
@ -635,7 +635,7 @@ impl<L> ServiceBuilder<L> {
|
||||
/// impl Service<Request> for MyService {
|
||||
/// type Response = Response;
|
||||
/// type Error = Error;
|
||||
/// type Future = futures_util::future::Ready<Result<Response, Error>>;
|
||||
/// type Future = std::future::Ready<Result<Response, Error>>;
|
||||
///
|
||||
/// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
/// // ...
|
||||
@ -789,6 +789,68 @@ impl<L> ServiceBuilder<L> {
|
||||
{
|
||||
self.layer(crate::util::BoxCloneService::layer())
|
||||
}
|
||||
|
||||
/// This wraps the inner service with the [`Layer`] returned by [`BoxCloneSyncServiceLayer`].
|
||||
///
|
||||
/// This is similar to the [`boxed_clone`] method, but it requires that `Self` implement
|
||||
/// [`Sync`], and the returned boxed service implements [`Sync`].
|
||||
///
|
||||
/// See [`BoxCloneSyncService`] for more details.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```
|
||||
/// use tower::{Service, ServiceBuilder, BoxError, util::BoxCloneSyncService};
|
||||
/// use std::time::Duration;
|
||||
/// #
|
||||
/// # struct Request;
|
||||
/// # struct Response;
|
||||
/// # impl Response {
|
||||
/// # fn new() -> Self { Self }
|
||||
/// # }
|
||||
///
|
||||
/// let service: BoxCloneSyncService<Request, Response, BoxError> = ServiceBuilder::new()
|
||||
/// .load_shed()
|
||||
/// .concurrency_limit(64)
|
||||
/// .timeout(Duration::from_secs(10))
|
||||
/// .boxed_clone_sync()
|
||||
/// .service_fn(|req: Request| async {
|
||||
/// Ok::<_, BoxError>(Response::new())
|
||||
/// });
|
||||
/// # let service = assert_service(service);
|
||||
///
|
||||
/// // The boxed service can still be cloned.
|
||||
/// service.clone();
|
||||
/// # fn assert_service<S, R>(svc: S) -> S
|
||||
/// # where S: Service<R> { svc }
|
||||
/// ```
|
||||
///
|
||||
/// [`BoxCloneSyncServiceLayer`]: crate::util::BoxCloneSyncServiceLayer
|
||||
/// [`BoxCloneSyncService`]: crate::util::BoxCloneSyncService
|
||||
/// [`boxed_clone`]: Self::boxed_clone
|
||||
#[cfg(feature = "util")]
|
||||
pub fn boxed_clone_sync<S, R>(
|
||||
self,
|
||||
) -> ServiceBuilder<
|
||||
Stack<
|
||||
crate::util::BoxCloneSyncServiceLayer<
|
||||
S,
|
||||
R,
|
||||
<L::Service as Service<R>>::Response,
|
||||
<L::Service as Service<R>>::Error,
|
||||
>,
|
||||
Identity,
|
||||
>,
|
||||
>
|
||||
where
|
||||
L: Layer<S> + Send + Sync + 'static,
|
||||
L::Service: Service<R> + Clone + Send + Sync + 'static,
|
||||
<L::Service as Service<R>>::Future: Send + Sync + 'static,
|
||||
{
|
||||
let layer = self.into_inner();
|
||||
|
||||
ServiceBuilder::new().layer(crate::util::BoxCloneSyncServiceLayer::new(layer))
|
||||
}
|
||||
}
|
||||
|
||||
impl<L: fmt::Debug> fmt::Debug for ServiceBuilder<L> {
|
||||
|
@ -12,7 +12,8 @@
|
||||
//! # Examples
|
||||
//!
|
||||
//! ```rust
|
||||
//! use futures_util::{future::poll_fn, pin_mut};
|
||||
//! use std::future::poll_fn;
|
||||
//! use futures_util::pin_mut;
|
||||
//! use tower::discover::{Change, Discover};
|
||||
//! async fn services_monitor<D: Discover>(services: D) {
|
||||
//! pin_mut!(services);
|
||||
|
@ -2,12 +2,11 @@
|
||||
|
||||
use super::AsyncPredicate;
|
||||
use crate::BoxError;
|
||||
use futures_core::ready;
|
||||
use pin_project_lite::pin_project;
|
||||
use std::{
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
task::{ready, Context, Poll},
|
||||
};
|
||||
use tower_service::Service;
|
||||
|
||||
@ -35,7 +34,7 @@ opaque_future! {
|
||||
/// [`Filter`]: crate::filter::Filter
|
||||
pub type ResponseFuture<R, F> =
|
||||
futures_util::future::Either<
|
||||
futures_util::future::Ready<Result<R, crate::BoxError>>,
|
||||
std::future::Ready<Result<R, crate::BoxError>>,
|
||||
futures_util::future::ErrInto<F, crate::BoxError>
|
||||
>;
|
||||
}
|
||||
|
@ -114,7 +114,7 @@ where
|
||||
fn call(&mut self, request: Request) -> Self::Future {
|
||||
ResponseFuture::new(match self.predicate.check(request) {
|
||||
Ok(request) => Either::Right(self.inner.call(request).err_into()),
|
||||
Err(e) => Either::Left(futures_util::future::ready(Err(e))),
|
||||
Err(e) => Either::Left(std::future::ready(Err(e))),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -1,10 +1,9 @@
|
||||
use futures_util::ready;
|
||||
use pin_project_lite::pin_project;
|
||||
use std::time::Duration;
|
||||
use std::{
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
task::{ready, Context, Poll},
|
||||
};
|
||||
use tower_service::Service;
|
||||
|
||||
|
@ -1,10 +1,9 @@
|
||||
use futures_util::ready;
|
||||
use pin_project_lite::pin_project;
|
||||
use std::time::Duration;
|
||||
use std::{
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
task::{ready, Context, Poll},
|
||||
};
|
||||
use tokio::time::Instant;
|
||||
use tower_service::Service;
|
||||
|
@ -4,11 +4,12 @@
|
||||
#![warn(missing_debug_implementations, missing_docs, unreachable_pub)]
|
||||
|
||||
use crate::filter::AsyncFilter;
|
||||
use futures_util::future;
|
||||
use futures_util::future::Either;
|
||||
use pin_project_lite::pin_project;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
use std::{
|
||||
future,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
@ -222,7 +223,7 @@ impl<P, Request> crate::filter::AsyncPredicate<Request> for PolicyPredicate<P>
|
||||
where
|
||||
P: Policy<Request>,
|
||||
{
|
||||
type Future = future::Either<
|
||||
type Future = Either<
|
||||
future::Ready<Result<Request, crate::BoxError>>,
|
||||
future::Pending<Result<Request, crate::BoxError>>,
|
||||
>;
|
||||
@ -230,13 +231,13 @@ where
|
||||
|
||||
fn check(&mut self, request: Request) -> Self::Future {
|
||||
if self.0.can_retry(&request) {
|
||||
future::Either::Left(future::ready(Ok(request)))
|
||||
Either::Left(future::ready(Ok(request)))
|
||||
} else {
|
||||
// If the hedge retry should not be issued, we simply want to wait
|
||||
// for the result of the original request. Therefore we don't want
|
||||
// to return an error here. Instead, we use future::pending to ensure
|
||||
// that the original request wins the select.
|
||||
future::Either::Right(future::pending())
|
||||
Either::Right(future::pending())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -219,6 +219,7 @@ pub use tower_layer::Layer;
|
||||
pub use tower_service::Service;
|
||||
|
||||
#[allow(unreachable_pub)]
|
||||
#[cfg(any(feature = "balance", feature = "discover", feature = "make"))]
|
||||
mod sealed {
|
||||
pub trait Sealed<T> {}
|
||||
}
|
||||
|
@ -1,12 +1,11 @@
|
||||
//! [`Future`] types
|
||||
//!
|
||||
//! [`Future`]: std::future::Future
|
||||
use futures_core::ready;
|
||||
use pin_project_lite::pin_project;
|
||||
use std::{
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
task::{ready, Context, Poll},
|
||||
};
|
||||
use tokio::sync::OwnedSemaphorePermit;
|
||||
|
||||
|
@ -3,10 +3,9 @@ use tokio::sync::{OwnedSemaphorePermit, Semaphore};
|
||||
use tokio_util::sync::PollSemaphore;
|
||||
use tower_service::Service;
|
||||
|
||||
use futures_core::ready;
|
||||
use std::{
|
||||
sync::Arc,
|
||||
task::{Context, Poll},
|
||||
task::{ready, Context, Poll},
|
||||
};
|
||||
|
||||
/// Enforces a limit on the concurrent number of requests the underlying
|
||||
|
@ -1,9 +1,8 @@
|
||||
use super::Rate;
|
||||
use futures_core::ready;
|
||||
use std::{
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
task::{ready, Context, Poll},
|
||||
};
|
||||
use tokio::time::{Instant, Sleep};
|
||||
use tower_service::Service;
|
||||
|
@ -1,11 +1,10 @@
|
||||
//! Application-specific request completion semantics.
|
||||
|
||||
use futures_core::ready;
|
||||
use pin_project_lite::pin_project;
|
||||
use std::{
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
task::{ready, Context, Poll},
|
||||
};
|
||||
|
||||
/// Attaches `H`-typed completion tracker to `V` typed values.
|
||||
|
@ -3,9 +3,9 @@
|
||||
#[cfg(feature = "discover")]
|
||||
use crate::discover::{Change, Discover};
|
||||
#[cfg(feature = "discover")]
|
||||
use futures_core::{ready, Stream};
|
||||
use futures_core::Stream;
|
||||
#[cfg(feature = "discover")]
|
||||
use std::pin::Pin;
|
||||
use std::{pin::Pin, task::ready};
|
||||
|
||||
use super::Load;
|
||||
use pin_project_lite::pin_project;
|
||||
|
@ -3,11 +3,11 @@
|
||||
#[cfg(feature = "discover")]
|
||||
use crate::discover::{Change, Discover};
|
||||
#[cfg(feature = "discover")]
|
||||
use futures_core::{ready, Stream};
|
||||
use futures_core::Stream;
|
||||
#[cfg(feature = "discover")]
|
||||
use pin_project_lite::pin_project;
|
||||
#[cfg(feature = "discover")]
|
||||
use std::pin::Pin;
|
||||
use std::{pin::Pin, task::ready};
|
||||
|
||||
use super::completion::{CompleteOnResponse, TrackCompletion, TrackCompletionFuture};
|
||||
use super::Load;
|
||||
@ -309,8 +309,7 @@ fn nanos(d: Duration) -> f64 {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use futures_util::future;
|
||||
use std::time::Duration;
|
||||
use std::{future, time::Duration};
|
||||
use tokio::time;
|
||||
use tokio_test::{assert_ready, assert_ready_ok, task};
|
||||
|
||||
@ -327,7 +326,7 @@ mod tests {
|
||||
}
|
||||
|
||||
fn call(&mut self, (): ()) -> Self::Future {
|
||||
future::ok(())
|
||||
future::ready(Ok(()))
|
||||
}
|
||||
}
|
||||
|
||||
@ -399,7 +398,7 @@ mod tests {
|
||||
assert_eq!(super::nanos(Duration::new(0, 123)), 123.0);
|
||||
assert_eq!(super::nanos(Duration::new(1, 23)), 1_000_000_023.0);
|
||||
assert_eq!(
|
||||
super::nanos(Duration::new(::std::u64::MAX, 999_999_999)),
|
||||
super::nanos(Duration::new(u64::MAX, 999_999_999)),
|
||||
18446744074709553000.0
|
||||
);
|
||||
}
|
||||
|
@ -3,11 +3,11 @@
|
||||
#[cfg(feature = "discover")]
|
||||
use crate::discover::{Change, Discover};
|
||||
#[cfg(feature = "discover")]
|
||||
use futures_core::{ready, Stream};
|
||||
use futures_core::Stream;
|
||||
#[cfg(feature = "discover")]
|
||||
use pin_project_lite::pin_project;
|
||||
#[cfg(feature = "discover")]
|
||||
use std::pin::Pin;
|
||||
use std::{pin::Pin, task::ready};
|
||||
|
||||
use super::completion::{CompleteOnResponse, TrackCompletion, TrackCompletionFuture};
|
||||
use super::Load;
|
||||
@ -148,8 +148,10 @@ impl RefCount {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use futures_util::future;
|
||||
use std::task::{Context, Poll};
|
||||
use std::{
|
||||
future,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
struct Svc;
|
||||
impl Service<()> for Svc {
|
||||
@ -162,7 +164,7 @@ mod tests {
|
||||
}
|
||||
|
||||
fn call(&mut self, (): ()) -> Self::Future {
|
||||
future::ok(())
|
||||
future::ready(Ok(()))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3,9 +3,8 @@
|
||||
use std::fmt;
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use std::task::{ready, Context, Poll};
|
||||
|
||||
use futures_core::ready;
|
||||
use pin_project_lite::pin_project;
|
||||
|
||||
use super::error::Overloaded;
|
||||
|
@ -118,7 +118,7 @@ pub trait MakeService<Target, Request>: Sealed<(Target, Request)> {
|
||||
/// # };
|
||||
/// # }
|
||||
/// ```
|
||||
fn as_service(&mut self) -> AsService<Self, Request>
|
||||
fn as_service(&mut self) -> AsService<'_, Self, Request>
|
||||
where
|
||||
Self: Sized,
|
||||
{
|
||||
|
@ -12,10 +12,10 @@ use tower_service::Service;
|
||||
/// # use std::task::{Context, Poll};
|
||||
/// # use std::pin::Pin;
|
||||
/// # use std::convert::Infallible;
|
||||
/// use std::future::{Ready, ready};
|
||||
/// use tower::make::{MakeService, Shared};
|
||||
/// use tower::buffer::Buffer;
|
||||
/// use tower::Service;
|
||||
/// use futures::future::{Ready, ready};
|
||||
///
|
||||
/// // An example connection type
|
||||
/// struct Connection {}
|
||||
@ -93,13 +93,13 @@ where
|
||||
}
|
||||
|
||||
fn call(&mut self, _target: T) -> Self::Future {
|
||||
SharedFuture::new(futures_util::future::ready(Ok(self.service.clone())))
|
||||
SharedFuture::new(std::future::ready(Ok(self.service.clone())))
|
||||
}
|
||||
}
|
||||
|
||||
opaque_future! {
|
||||
/// Response future from [`Shared`] services.
|
||||
pub type SharedFuture<S> = futures_util::future::Ready<Result<S, Infallible>>;
|
||||
pub type SharedFuture<S> = std::future::Ready<Result<S, Infallible>>;
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@ -107,7 +107,7 @@ mod tests {
|
||||
use super::*;
|
||||
use crate::make::MakeService;
|
||||
use crate::service_fn;
|
||||
use futures::future::poll_fn;
|
||||
use std::future::poll_fn;
|
||||
|
||||
async fn echo<R>(req: R) -> Result<R, Infallible> {
|
||||
Ok(req)
|
||||
|
@ -79,10 +79,10 @@ where
|
||||
///
|
||||
/// Returns a config validation error if:
|
||||
/// - `min` > `max`
|
||||
/// - `max` > 0
|
||||
/// - `jitter` >= `0.0`
|
||||
/// - `jitter` < `100.0`
|
||||
/// - `jitter` is finite
|
||||
/// - `max` == 0
|
||||
/// - `jitter` < `0.0`
|
||||
/// - `jitter` > `100.0`
|
||||
/// - `jitter` is not finite
|
||||
pub fn new(
|
||||
min: time::Duration,
|
||||
max: time::Duration,
|
||||
|
@ -27,9 +27,8 @@
|
||||
//! # Examples
|
||||
//!
|
||||
//! ```rust
|
||||
//! use std::sync::Arc;
|
||||
//! use std::{future, sync::Arc};
|
||||
//!
|
||||
//! use futures_util::future;
|
||||
//! use tower::retry::{budget::{Budget, TpsBudget}, Policy};
|
||||
//!
|
||||
//! type Req = String;
|
||||
|
@ -72,7 +72,7 @@ impl TpsBudget {
|
||||
assert!(ttl <= Duration::from_secs(60));
|
||||
assert!(retry_percent >= 0.0);
|
||||
assert!(retry_percent <= 1000.0);
|
||||
assert!(min_per_sec < ::std::i32::MAX as u32);
|
||||
assert!(min_per_sec < i32::MAX as u32);
|
||||
|
||||
let (deposit_amount, withdraw_amount) = if retry_percent == 0.0 {
|
||||
// If there is no percent, then you gain nothing from deposits.
|
||||
|
@ -1,11 +1,10 @@
|
||||
//! Future types
|
||||
|
||||
use super::{Policy, Retry};
|
||||
use futures_core::ready;
|
||||
use pin_project_lite::pin_project;
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use std::task::{ready, Context, Poll};
|
||||
use tower_service::Service;
|
||||
|
||||
pin_project! {
|
||||
|
@ -6,7 +6,7 @@ use std::future::Future;
|
||||
///
|
||||
/// ```
|
||||
/// use tower::retry::Policy;
|
||||
/// use futures_util::future;
|
||||
/// use std::future;
|
||||
///
|
||||
/// type Req = String;
|
||||
/// type Res = String;
|
||||
@ -91,4 +91,4 @@ pub trait Policy<Req, Res, E> {
|
||||
|
||||
// Ensure `Policy` is object safe
|
||||
#[cfg(test)]
|
||||
fn _obj_safe(_: Box<dyn Policy<(), (), (), Future = futures::future::Ready<()>>>) {}
|
||||
fn _obj_safe(_: Box<dyn Policy<(), (), (), Future = std::future::Ready<()>>>) {}
|
||||
|
@ -1,11 +1,10 @@
|
||||
use super::{future::ResponseFuture, SpawnReadyLayer};
|
||||
use crate::{util::ServiceExt, BoxError};
|
||||
use futures_core::ready;
|
||||
use futures_util::future::TryFutureExt;
|
||||
use std::{
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
task::{ready, Context, Poll},
|
||||
};
|
||||
use tower_service::Service;
|
||||
use tracing::Instrument;
|
||||
|
@ -8,9 +8,8 @@
|
||||
//! Here, `GET /` will be sent to the `root` service, while all other requests go to `not_found`.
|
||||
//!
|
||||
//! ```rust
|
||||
//! # use std::task::{Context, Poll};
|
||||
//! # use std::task::{Context, Poll, ready};
|
||||
//! # use tower_service::Service;
|
||||
//! # use futures_util::future::{ready, Ready, poll_fn};
|
||||
//! # use tower::steer::Steer;
|
||||
//! # use tower::service_fn;
|
||||
//! # use tower::util::BoxService;
|
||||
|
129
tower/src/util/boxed/layer_clone_sync.rs
Normal file
129
tower/src/util/boxed/layer_clone_sync.rs
Normal file
@ -0,0 +1,129 @@
|
||||
use std::{fmt, sync::Arc};
|
||||
use tower_layer::{layer_fn, Layer};
|
||||
use tower_service::Service;
|
||||
|
||||
use crate::util::BoxCloneSyncService;
|
||||
|
||||
/// A [`Clone`] + [`Send`] + [`Sync`] boxed [`Layer`].
|
||||
///
|
||||
/// [`BoxCloneSyncServiceLayer`] turns a layer into a trait object, allowing both the [`Layer`] itself
|
||||
/// and the output [`Service`] to be dynamic, while having consistent types.
|
||||
///
|
||||
/// This [`Layer`] produces [`BoxCloneSyncService`] instances erasing the type of the
|
||||
/// [`Service`] produced by the wrapped [`Layer`].
|
||||
///
|
||||
/// This is similar to [`BoxCloneServiceLayer`](super::BoxCloneServiceLayer) except the layer and resulting
|
||||
/// service implements [`Sync`].
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// `BoxCloneSyncServiceLayer` can, for example, be useful to create layers dynamically that otherwise wouldn't have
|
||||
/// the same types, when the underlying service must be clone and sync (for example, when building a Hyper connector).
|
||||
/// In this example, we include a [`Timeout`] layer only if an environment variable is set. We can use
|
||||
/// `BoxCloneSyncServiceLayer` to return a consistent type regardless of runtime configuration:
|
||||
///
|
||||
/// ```
|
||||
/// use std::time::Duration;
|
||||
/// use tower::{Service, ServiceBuilder, BoxError};
|
||||
/// use tower::util::{BoxCloneSyncServiceLayer, BoxCloneSyncService};
|
||||
///
|
||||
/// #
|
||||
/// # struct Request;
|
||||
/// # struct Response;
|
||||
/// # impl Response {
|
||||
/// # fn new() -> Self { Self }
|
||||
/// # }
|
||||
///
|
||||
/// fn common_layer<S, T>() -> BoxCloneSyncServiceLayer<S, T, S::Response, BoxError>
|
||||
/// where
|
||||
/// S: Service<T> + Clone + Send + Sync + 'static,
|
||||
/// S::Future: Send + 'static,
|
||||
/// S::Error: Into<BoxError> + 'static,
|
||||
/// {
|
||||
/// let builder = ServiceBuilder::new()
|
||||
/// .concurrency_limit(100);
|
||||
///
|
||||
/// if std::env::var("SET_TIMEOUT").is_ok() {
|
||||
/// let layer = builder
|
||||
/// .timeout(Duration::from_secs(30))
|
||||
/// .into_inner();
|
||||
///
|
||||
/// BoxCloneSyncServiceLayer::new(layer)
|
||||
/// } else {
|
||||
/// let layer = builder
|
||||
/// .map_err(Into::into)
|
||||
/// .into_inner();
|
||||
///
|
||||
/// BoxCloneSyncServiceLayer::new(layer)
|
||||
/// }
|
||||
/// }
|
||||
///
|
||||
/// // We can clone the layer (this is true of BoxLayer as well)
|
||||
/// let boxed_clone_sync_layer = common_layer();
|
||||
///
|
||||
/// let cloned_sync_layer = boxed_clone_sync_layer.clone();
|
||||
///
|
||||
/// // Using the `BoxCloneSyncServiceLayer` we can create a `BoxCloneSyncService`
|
||||
/// let service: BoxCloneSyncService<Request, Response, BoxError> = ServiceBuilder::new().layer(cloned_sync_layer)
|
||||
/// .service_fn(|req: Request| async {
|
||||
/// Ok::<_, BoxError>(Response::new())
|
||||
/// });
|
||||
///
|
||||
/// # let service = assert_service(service);
|
||||
///
|
||||
/// // And we can still clone the service
|
||||
/// let cloned_service = service.clone();
|
||||
/// #
|
||||
/// # fn assert_service<S, R>(svc: S) -> S
|
||||
/// # where S: Service<R> { svc }
|
||||
///
|
||||
/// ```
|
||||
///
|
||||
/// [`Layer`]: tower_layer::Layer
|
||||
/// [`Service`]: tower_service::Service
|
||||
/// [`BoxService`]: super::BoxService
|
||||
/// [`Timeout`]: crate::timeout
|
||||
pub struct BoxCloneSyncServiceLayer<In, T, U, E> {
|
||||
boxed: Arc<dyn Layer<In, Service = BoxCloneSyncService<T, U, E>> + Send + Sync + 'static>,
|
||||
}
|
||||
|
||||
impl<In, T, U, E> BoxCloneSyncServiceLayer<In, T, U, E> {
|
||||
/// Create a new [`BoxCloneSyncServiceLayer`].
|
||||
pub fn new<L>(inner_layer: L) -> Self
|
||||
where
|
||||
L: Layer<In> + Send + Sync + 'static,
|
||||
L::Service: Service<T, Response = U, Error = E> + Send + Sync + Clone + 'static,
|
||||
<L::Service as Service<T>>::Future: Send + 'static,
|
||||
{
|
||||
let layer = layer_fn(move |inner: In| {
|
||||
let out = inner_layer.layer(inner);
|
||||
BoxCloneSyncService::new(out)
|
||||
});
|
||||
|
||||
Self {
|
||||
boxed: Arc::new(layer),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<In, T, U, E> Layer<In> for BoxCloneSyncServiceLayer<In, T, U, E> {
|
||||
type Service = BoxCloneSyncService<T, U, E>;
|
||||
|
||||
fn layer(&self, inner: In) -> Self::Service {
|
||||
self.boxed.layer(inner)
|
||||
}
|
||||
}
|
||||
|
||||
impl<In, T, U, E> Clone for BoxCloneSyncServiceLayer<In, T, U, E> {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
boxed: Arc::clone(&self.boxed),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<In, T, U, E> fmt::Debug for BoxCloneSyncServiceLayer<In, T, U, E> {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
fmt.debug_struct("BoxCloneSyncServiceLayer").finish()
|
||||
}
|
||||
}
|
@ -1,9 +1,11 @@
|
||||
mod layer;
|
||||
mod layer_clone;
|
||||
mod layer_clone_sync;
|
||||
mod sync;
|
||||
mod unsync;
|
||||
|
||||
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
|
||||
pub use self::{
|
||||
layer::BoxLayer, layer_clone::BoxCloneServiceLayer, sync::BoxService, unsync::UnsyncBoxService,
|
||||
layer::BoxLayer, layer_clone::BoxCloneServiceLayer, layer_clone_sync::BoxCloneSyncServiceLayer,
|
||||
sync::BoxService, unsync::UnsyncBoxService,
|
||||
};
|
||||
|
@ -28,7 +28,7 @@ use std::{
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use futures_util::future::ready;
|
||||
/// use std::future::ready;
|
||||
/// # use tower_service::Service;
|
||||
/// # use tower::util::{BoxService, service_fn};
|
||||
/// // Respond to requests using a closure, but closures cannot be named...
|
||||
|
101
tower/src/util/boxed_clone_sync.rs
Normal file
101
tower/src/util/boxed_clone_sync.rs
Normal file
@ -0,0 +1,101 @@
|
||||
use super::ServiceExt;
|
||||
use futures_util::future::BoxFuture;
|
||||
use std::{
|
||||
fmt,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use tower_layer::{layer_fn, LayerFn};
|
||||
use tower_service::Service;
|
||||
|
||||
/// A [`Clone`] + [`Send`] + [`Sync`] boxed [`Service`].
|
||||
///
|
||||
/// [`BoxCloneSyncService`] turns a service into a trait object, allowing the
|
||||
/// response future type to be dynamic, and allowing the service to be cloned and shared.
|
||||
///
|
||||
/// This is similar to [`BoxCloneService`](super::BoxCloneService) except the resulting
|
||||
/// service implements [`Sync`].
|
||||
/// ```
|
||||
pub struct BoxCloneSyncService<T, U, E>(
|
||||
Box<
|
||||
dyn CloneService<T, Response = U, Error = E, Future = BoxFuture<'static, Result<U, E>>>
|
||||
+ Send
|
||||
+ Sync,
|
||||
>,
|
||||
);
|
||||
|
||||
impl<T, U, E> BoxCloneSyncService<T, U, E> {
|
||||
/// Create a new `BoxCloneSyncService`.
|
||||
pub fn new<S>(inner: S) -> Self
|
||||
where
|
||||
S: Service<T, Response = U, Error = E> + Clone + Send + Sync + 'static,
|
||||
S::Future: Send + 'static,
|
||||
{
|
||||
let inner = inner.map_future(|f| Box::pin(f) as _);
|
||||
BoxCloneSyncService(Box::new(inner))
|
||||
}
|
||||
|
||||
/// Returns a [`Layer`] for wrapping a [`Service`] in a [`BoxCloneSyncService`]
|
||||
/// middleware.
|
||||
///
|
||||
/// [`Layer`]: crate::Layer
|
||||
pub fn layer<S>() -> LayerFn<fn(S) -> Self>
|
||||
where
|
||||
S: Service<T, Response = U, Error = E> + Clone + Send + Sync + 'static,
|
||||
S::Future: Send + 'static,
|
||||
{
|
||||
layer_fn(Self::new)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, U, E> Service<T> for BoxCloneSyncService<T, U, E> {
|
||||
type Response = U;
|
||||
type Error = E;
|
||||
type Future = BoxFuture<'static, Result<U, E>>;
|
||||
|
||||
#[inline]
|
||||
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), E>> {
|
||||
self.0.poll_ready(cx)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn call(&mut self, request: T) -> Self::Future {
|
||||
self.0.call(request)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, U, E> Clone for BoxCloneSyncService<T, U, E> {
|
||||
fn clone(&self) -> Self {
|
||||
Self(self.0.clone_box())
|
||||
}
|
||||
}
|
||||
|
||||
trait CloneService<R>: Service<R> {
|
||||
fn clone_box(
|
||||
&self,
|
||||
) -> Box<
|
||||
dyn CloneService<R, Response = Self::Response, Error = Self::Error, Future = Self::Future>
|
||||
+ Send
|
||||
+ Sync,
|
||||
>;
|
||||
}
|
||||
|
||||
impl<R, T> CloneService<R> for T
|
||||
where
|
||||
T: Service<R> + Send + Sync + Clone + 'static,
|
||||
{
|
||||
fn clone_box(
|
||||
&self,
|
||||
) -> Box<
|
||||
dyn CloneService<R, Response = T::Response, Error = T::Error, Future = T::Future>
|
||||
+ Send
|
||||
+ Sync,
|
||||
> {
|
||||
Box::new(self.clone())
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, U, E> fmt::Debug for BoxCloneSyncService<T, U, E> {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
fmt.debug_struct("BoxCloneSyncService").finish()
|
||||
}
|
||||
}
|
@ -1,10 +1,10 @@
|
||||
use futures_core::{ready, Stream};
|
||||
use futures_core::Stream;
|
||||
use pin_project_lite::pin_project;
|
||||
use std::{
|
||||
fmt,
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
task::{ready, Context, Poll},
|
||||
};
|
||||
use tower_service::Service;
|
||||
|
||||
|
@ -24,7 +24,7 @@ pin_project! {
|
||||
/// # use std::error::Error;
|
||||
/// # use std::rc::Rc;
|
||||
/// #
|
||||
/// use futures::future::{ready, Ready};
|
||||
/// use std::future::{ready, Ready};
|
||||
/// use futures::StreamExt;
|
||||
/// use futures::channel::mpsc;
|
||||
/// use tower_service::Service;
|
||||
|
@ -154,7 +154,7 @@ where
|
||||
self.state = match &mut self.state {
|
||||
State::Future(fut) => {
|
||||
let fut = Pin::new(fut);
|
||||
let svc = futures_core::ready!(fut.poll(cx)?);
|
||||
let svc = std::task::ready!(fut.poll(cx)?);
|
||||
State::Service(svc)
|
||||
}
|
||||
State::Service(svc) => return svc.poll_ready(cx),
|
||||
@ -176,22 +176,24 @@ mod tests {
|
||||
use super::*;
|
||||
use crate::util::{future_service, ServiceExt};
|
||||
use crate::Service;
|
||||
use futures::future::{ready, Ready};
|
||||
use std::convert::Infallible;
|
||||
use std::{
|
||||
convert::Infallible,
|
||||
future::{ready, Ready},
|
||||
};
|
||||
|
||||
#[tokio::test]
|
||||
async fn pending_service_debug_impl() {
|
||||
let mut pending_svc = future_service(ready(Ok(DebugService)));
|
||||
|
||||
assert_eq!(
|
||||
format!("{:?}", pending_svc),
|
||||
"FutureService { state: State::Future(<futures_util::future::ready::Ready<core::result::Result<tower::util::future_service::tests::DebugService, core::convert::Infallible>>>) }"
|
||||
format!("{pending_svc:?}"),
|
||||
"FutureService { state: State::Future(<core::future::ready::Ready<core::result::Result<tower::util::future_service::tests::DebugService, core::convert::Infallible>>>) }"
|
||||
);
|
||||
|
||||
pending_svc.ready().await.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
format!("{:?}", pending_svc),
|
||||
format!("{pending_svc:?}"),
|
||||
"FutureService { state: State::Service(DebugService) }"
|
||||
);
|
||||
}
|
||||
|
@ -3,6 +3,7 @@
|
||||
mod and_then;
|
||||
mod boxed;
|
||||
mod boxed_clone;
|
||||
mod boxed_clone_sync;
|
||||
mod call_all;
|
||||
mod either;
|
||||
|
||||
@ -23,8 +24,11 @@ pub mod rng;
|
||||
|
||||
pub use self::{
|
||||
and_then::{AndThen, AndThenLayer},
|
||||
boxed::{BoxCloneServiceLayer, BoxLayer, BoxService, UnsyncBoxService},
|
||||
boxed::{
|
||||
BoxCloneServiceLayer, BoxCloneSyncServiceLayer, BoxLayer, BoxService, UnsyncBoxService,
|
||||
},
|
||||
boxed_clone::BoxCloneService,
|
||||
boxed_clone_sync::BoxCloneSyncService,
|
||||
either::Either,
|
||||
future_service::{future_service, FutureService},
|
||||
map_err::{MapErr, MapErrLayer},
|
||||
@ -135,14 +139,14 @@ pub trait ServiceExt<Request>: tower_service::Service<Request> {
|
||||
/// # impl Service<u32> for DatabaseService {
|
||||
/// # type Response = Record;
|
||||
/// # type Error = u8;
|
||||
/// # type Future = futures_util::future::Ready<Result<Record, u8>>;
|
||||
/// # type Future = std::future::Ready<Result<Record, u8>>;
|
||||
/// #
|
||||
/// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
/// # Poll::Ready(Ok(()))
|
||||
/// # }
|
||||
/// #
|
||||
/// # fn call(&mut self, request: u32) -> Self::Future {
|
||||
/// # futures_util::future::ready(Ok(Record { name: "Jack".into(), age: 32 }))
|
||||
/// # std::future::ready(Ok(Record { name: "Jack".into(), age: 32 }))
|
||||
/// # }
|
||||
/// # }
|
||||
/// #
|
||||
@ -204,14 +208,14 @@ pub trait ServiceExt<Request>: tower_service::Service<Request> {
|
||||
/// # impl Service<u32> for DatabaseService {
|
||||
/// # type Response = Record;
|
||||
/// # type Error = u8;
|
||||
/// # type Future = futures_util::future::Ready<Result<Record, u8>>;
|
||||
/// # type Future = std::future::Ready<Result<Record, u8>>;
|
||||
/// #
|
||||
/// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
/// # Poll::Ready(Ok(()))
|
||||
/// # }
|
||||
/// #
|
||||
/// # fn call(&mut self, request: u32) -> Self::Future {
|
||||
/// # futures_util::future::ready(Ok(Record { name: "Jack".into(), age: 32 }))
|
||||
/// # std::future::ready(Ok(Record { name: "Jack".into(), age: 32 }))
|
||||
/// # }
|
||||
/// # }
|
||||
/// #
|
||||
@ -271,14 +275,14 @@ pub trait ServiceExt<Request>: tower_service::Service<Request> {
|
||||
/// # impl Service<u32> for DatabaseService {
|
||||
/// # type Response = String;
|
||||
/// # type Error = Error;
|
||||
/// # type Future = futures_util::future::Ready<Result<String, Error>>;
|
||||
/// # type Future = std::future::Ready<Result<String, Error>>;
|
||||
/// #
|
||||
/// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
/// # Poll::Ready(Ok(()))
|
||||
/// # }
|
||||
/// #
|
||||
/// # fn call(&mut self, request: u32) -> Self::Future {
|
||||
/// # futures_util::future::ready(Ok(String::new()))
|
||||
/// # std::future::ready(Ok(String::new()))
|
||||
/// # }
|
||||
/// # }
|
||||
/// #
|
||||
@ -366,14 +370,14 @@ pub trait ServiceExt<Request>: tower_service::Service<Request> {
|
||||
/// # impl Service<u32> for DatabaseService {
|
||||
/// # type Response = Vec<Record>;
|
||||
/// # type Error = DbError;
|
||||
/// # type Future = futures_util::future::Ready<Result<Vec<Record>, DbError>>;
|
||||
/// # type Future = std::future::Ready<Result<Vec<Record>, DbError>>;
|
||||
/// #
|
||||
/// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
/// # Poll::Ready(Ok(()))
|
||||
/// # }
|
||||
/// #
|
||||
/// # fn call(&mut self, request: u32) -> Self::Future {
|
||||
/// # futures_util::future::ready(Ok(vec![Record { name: "Jack".into(), age: 32 }]))
|
||||
/// # std::future::ready(Ok(vec![Record { name: "Jack".into(), age: 32 }]))
|
||||
/// # }
|
||||
/// # }
|
||||
/// #
|
||||
@ -426,14 +430,14 @@ pub trait ServiceExt<Request>: tower_service::Service<Request> {
|
||||
/// # impl Service<u32> for DatabaseService {
|
||||
/// # type Response = Record;
|
||||
/// # type Error = DbError;
|
||||
/// # type Future = futures_util::future::Ready<Result<Record, DbError>>;
|
||||
/// # type Future = std::future::Ready<Result<Record, DbError>>;
|
||||
/// #
|
||||
/// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
/// # Poll::Ready(Ok(()))
|
||||
/// # }
|
||||
/// #
|
||||
/// # fn call(&mut self, request: u32) -> Self::Future {
|
||||
/// # futures_util::future::ready(Ok(Record { name: "Jack".into(), age: 32 }))
|
||||
/// # std::future::ready(Ok(Record { name: "Jack".into(), age: 32 }))
|
||||
/// # }
|
||||
/// # }
|
||||
/// #
|
||||
@ -490,14 +494,14 @@ pub trait ServiceExt<Request>: tower_service::Service<Request> {
|
||||
/// # impl Service<u32> for DatabaseService {
|
||||
/// # type Response = String;
|
||||
/// # type Error = u8;
|
||||
/// # type Future = futures_util::future::Ready<Result<String, u8>>;
|
||||
/// # type Future = std::future::Ready<Result<String, u8>>;
|
||||
/// #
|
||||
/// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
/// # Poll::Ready(Ok(()))
|
||||
/// # }
|
||||
/// #
|
||||
/// # fn call(&mut self, request: u32) -> Self::Future {
|
||||
/// # futures_util::future::ready(Ok(String::new()))
|
||||
/// # std::future::ready(Ok(String::new()))
|
||||
/// # }
|
||||
/// # }
|
||||
/// #
|
||||
@ -561,14 +565,14 @@ pub trait ServiceExt<Request>: tower_service::Service<Request> {
|
||||
/// # impl Service<String> for DatabaseService {
|
||||
/// # type Response = String;
|
||||
/// # type Error = u8;
|
||||
/// # type Future = futures_util::future::Ready<Result<String, u8>>;
|
||||
/// # type Future = std::future::Ready<Result<String, u8>>;
|
||||
/// #
|
||||
/// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
/// # Poll::Ready(Ok(()))
|
||||
/// # }
|
||||
/// #
|
||||
/// # fn call(&mut self, request: String) -> Self::Future {
|
||||
/// # futures_util::future::ready(Ok(String::new()))
|
||||
/// # std::future::ready(Ok(String::new()))
|
||||
/// # }
|
||||
/// # }
|
||||
/// #
|
||||
@ -629,14 +633,14 @@ pub trait ServiceExt<Request>: tower_service::Service<Request> {
|
||||
/// # impl Service<u32> for DatabaseService {
|
||||
/// # type Response = String;
|
||||
/// # type Error = DbError;
|
||||
/// # type Future = futures_util::future::Ready<Result<String, DbError>>;
|
||||
/// # type Future = std::future::Ready<Result<String, DbError>>;
|
||||
/// #
|
||||
/// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
/// # Poll::Ready(Ok(()))
|
||||
/// # }
|
||||
/// #
|
||||
/// # fn call(&mut self, request: u32) -> Self::Future {
|
||||
/// # futures_util::future::ready(Ok(String::new()))
|
||||
/// # std::future::ready(Ok(String::new()))
|
||||
/// # }
|
||||
/// # }
|
||||
/// #
|
||||
@ -702,14 +706,14 @@ pub trait ServiceExt<Request>: tower_service::Service<Request> {
|
||||
/// # impl Service<u32> for DatabaseService {
|
||||
/// # type Response = String;
|
||||
/// # type Error = DbError;
|
||||
/// # type Future = futures_util::future::Ready<Result<String, DbError>>;
|
||||
/// # type Future = std::future::Ready<Result<String, DbError>>;
|
||||
/// #
|
||||
/// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
/// # Poll::Ready(Ok(()))
|
||||
/// # }
|
||||
/// #
|
||||
/// # fn call(&mut self, request: u32) -> Self::Future {
|
||||
/// # futures_util::future::ready(Ok(String::new()))
|
||||
/// # std::future::ready(Ok(String::new()))
|
||||
/// # }
|
||||
/// # }
|
||||
/// #
|
||||
@ -804,14 +808,14 @@ pub trait ServiceExt<Request>: tower_service::Service<Request> {
|
||||
/// # impl Service<u32> for DatabaseService {
|
||||
/// # type Response = Record;
|
||||
/// # type Error = DbError;
|
||||
/// # type Future = futures_util::future::Ready<Result<Record, DbError>>;
|
||||
/// # type Future = std::future::Ready<Result<Record, DbError>>;
|
||||
/// #
|
||||
/// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
/// # Poll::Ready(Ok(()))
|
||||
/// # }
|
||||
/// #
|
||||
/// # fn call(&mut self, request: u32) -> Self::Future {
|
||||
/// # futures_util::future::ready(Ok(()))
|
||||
/// # std::future::ready(Ok(()))
|
||||
/// # }
|
||||
/// # }
|
||||
/// #
|
||||
@ -890,14 +894,14 @@ pub trait ServiceExt<Request>: tower_service::Service<Request> {
|
||||
/// # impl Service<u32> for DatabaseService {
|
||||
/// # type Response = Record;
|
||||
/// # type Error = DbError;
|
||||
/// # type Future = futures_util::future::Ready<Result<Record, DbError>>;
|
||||
/// # type Future = std::future::Ready<Result<Record, DbError>>;
|
||||
/// #
|
||||
/// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
/// # Poll::Ready(Ok(()))
|
||||
/// # }
|
||||
/// #
|
||||
/// # fn call(&mut self, request: u32) -> Self::Future {
|
||||
/// # futures_util::future::ready(Ok(()))
|
||||
/// # std::future::ready(Ok(()))
|
||||
/// # }
|
||||
/// # }
|
||||
/// #
|
||||
|
@ -1,10 +1,9 @@
|
||||
use futures_core::ready;
|
||||
use pin_project_lite::pin_project;
|
||||
use std::{
|
||||
fmt,
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
task::{ready, Context, Poll},
|
||||
};
|
||||
use tower_service::Service;
|
||||
|
||||
@ -89,7 +88,7 @@ where
|
||||
loop {
|
||||
match this.state.as_mut().project() {
|
||||
StateProj::NotReady { svc, req } => {
|
||||
let _ = ready!(svc.poll_ready(cx))?;
|
||||
ready!(svc.poll_ready(cx))?;
|
||||
let f = svc.call(req.take().expect("already called"));
|
||||
this.state.set(State::called(f));
|
||||
}
|
||||
|
@ -1,10 +1,9 @@
|
||||
use super::error;
|
||||
use futures_core::ready;
|
||||
use pin_project_lite::pin_project;
|
||||
use std::{
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
task::{ready, Context, Poll},
|
||||
};
|
||||
|
||||
pin_project! {
|
||||
|
@ -1,10 +1,9 @@
|
||||
use std::{fmt, marker::PhantomData};
|
||||
|
||||
use futures_core::ready;
|
||||
use std::{
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
task::{ready, Context, Poll},
|
||||
};
|
||||
use tower_service::Service;
|
||||
|
||||
@ -70,7 +69,7 @@ where
|
||||
pub struct Ready<'a, T, Request>(ReadyOneshot<&'a mut T, Request>);
|
||||
|
||||
// Safety: This is safe for the same reason that the impl for ReadyOneshot is safe.
|
||||
impl<'a, T, Request> Unpin for Ready<'a, T, Request> {}
|
||||
impl<T, Request> Unpin for Ready<'_, T, Request> {}
|
||||
|
||||
impl<'a, T, Request> Ready<'a, T, Request>
|
||||
where
|
||||
@ -93,7 +92,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T, Request> fmt::Debug for Ready<'a, T, Request>
|
||||
impl<T, Request> fmt::Debug for Ready<'_, T, Request>
|
||||
where
|
||||
T: fmt::Debug,
|
||||
{
|
||||
|
@ -25,7 +25,7 @@ pub trait Rng {
|
||||
/// Generate a random [`f64`] between `[0, 1)`.
|
||||
fn next_f64(&mut self) -> f64 {
|
||||
// Borrowed from:
|
||||
// https://github.com/rust-random/rand/blob/master/src/distributions/float.rs#L106
|
||||
// https://github.com/rust-random/rand/blob/master/src/distr/float.rs#L108
|
||||
let float_size = std::mem::size_of::<f64>() as u32 * 8;
|
||||
let precision = 52 + 1;
|
||||
let scale = 1.0 / ((1u64 << precision) as f64);
|
||||
@ -40,7 +40,7 @@ pub trait Rng {
|
||||
///
|
||||
/// # Panic
|
||||
///
|
||||
/// - If start < end this will panic in debug mode.
|
||||
/// - If `range.start >= range.end` this will panic in debug mode.
|
||||
fn next_range(&mut self, range: Range<u64>) -> u64 {
|
||||
debug_assert!(
|
||||
range.start < range.end,
|
||||
@ -116,6 +116,7 @@ where
|
||||
///
|
||||
/// ref: This was borrowed and modified from the following Rand implementation
|
||||
/// https://github.com/rust-random/rand/blob/b73640705d6714509f8ceccc49e8df996fa19f51/src/seq/index.rs#L375-L411
|
||||
#[cfg(feature = "balance")]
|
||||
pub(crate) fn sample_floyd2<R: Rng>(rng: &mut R, length: u64) -> [u64; 2] {
|
||||
debug_assert!(2 <= length);
|
||||
let aidx = rng.next_range(0..length - 1);
|
||||
@ -131,20 +132,24 @@ mod tests {
|
||||
|
||||
quickcheck! {
|
||||
fn next_f64(counter: u64) -> TestResult {
|
||||
let mut rng = HasherRng::default();
|
||||
rng.counter = counter;
|
||||
let mut rng = HasherRng {
|
||||
counter,
|
||||
..HasherRng::default()
|
||||
};
|
||||
let n = rng.next_f64();
|
||||
|
||||
TestResult::from_bool(n < 1.0 && n >= 0.0)
|
||||
TestResult::from_bool((0.0..1.0).contains(&n))
|
||||
}
|
||||
|
||||
fn next_range(counter: u64, range: Range<u64>) -> TestResult {
|
||||
if range.start >= range.end{
|
||||
if range.start >= range.end{
|
||||
return TestResult::discard();
|
||||
}
|
||||
|
||||
let mut rng = HasherRng::default();
|
||||
rng.counter = counter;
|
||||
let mut rng = HasherRng {
|
||||
counter,
|
||||
..HasherRng::default()
|
||||
};
|
||||
|
||||
let n = rng.next_range(range.clone());
|
||||
|
||||
@ -152,12 +157,14 @@ mod tests {
|
||||
}
|
||||
|
||||
fn sample_floyd2(counter: u64, length: u64) -> TestResult {
|
||||
if length < 2 || length > 256 {
|
||||
if !(2..=256).contains(&length) {
|
||||
return TestResult::discard();
|
||||
}
|
||||
|
||||
let mut rng = HasherRng::default();
|
||||
rng.counter = counter;
|
||||
let mut rng = HasherRng {
|
||||
counter,
|
||||
..HasherRng::default()
|
||||
};
|
||||
|
||||
let [a, b] = super::sample_floyd2(&mut rng, length);
|
||||
|
||||
|
@ -4,6 +4,7 @@ mod support;
|
||||
|
||||
use std::future::Future;
|
||||
use std::task::{Context, Poll};
|
||||
use tokio_stream::wrappers::UnboundedReceiverStream;
|
||||
use tokio_test::{assert_pending, assert_ready, task};
|
||||
use tower::balance::p2c::Balance;
|
||||
use tower::discover::Change;
|
||||
@ -28,7 +29,7 @@ impl Service<Req> for Mock {
|
||||
impl tower::load::Load for Mock {
|
||||
type Metric = usize;
|
||||
fn load(&self) -> Self::Metric {
|
||||
rand::random()
|
||||
rand::random_range(usize::MIN..=usize::MAX)
|
||||
}
|
||||
}
|
||||
|
||||
@ -37,15 +38,15 @@ fn stress() {
|
||||
let _t = support::trace_init();
|
||||
let mut task = task::spawn(());
|
||||
let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<Result<_, &'static str>>();
|
||||
let mut cache = Balance::<_, Req>::new(support::IntoStream::new(rx));
|
||||
let mut cache = Balance::<_, Req>::new(UnboundedReceiverStream::new(rx));
|
||||
|
||||
let mut nready = 0;
|
||||
let mut services = slab::Slab::<(mock::Handle<Req, Req>, bool)>::new();
|
||||
let mut retired = Vec::<mock::Handle<Req, Req>>::new();
|
||||
for _ in 0..100_000 {
|
||||
for _ in 0..(rand::random::<u8>() % 8) {
|
||||
for _ in 0..rand::random_range(0u8..8) {
|
||||
if !services.is_empty() && rand::random() {
|
||||
if nready == 0 || rand::random::<u8>() > u8::max_value() / 4 {
|
||||
if nready == 0 || rand::random::<u8>() > u8::MAX / 4 {
|
||||
// ready a service
|
||||
// TODO: sometimes ready a removed service?
|
||||
for (_, (handle, ready)) in &mut services {
|
||||
@ -114,7 +115,7 @@ fn stress() {
|
||||
} else {
|
||||
// remove
|
||||
while !services.is_empty() {
|
||||
let k = rand::random::<usize>() % (services.iter().last().unwrap().0 + 1);
|
||||
let k = rand::random_range(0..=services.iter().next_back().unwrap().0);
|
||||
if services.contains(k) {
|
||||
let (handle, ready) = services.remove(k);
|
||||
if ready {
|
||||
@ -129,7 +130,7 @@ fn stress() {
|
||||
} else {
|
||||
// fail a service
|
||||
while !services.is_empty() {
|
||||
let k = rand::random::<usize>() % (services.iter().last().unwrap().0 + 1);
|
||||
let k = rand::random_range(0..=services.iter().next_back().unwrap().0);
|
||||
if services.contains(k) {
|
||||
let (mut handle, ready) = services.remove(k);
|
||||
if ready {
|
||||
|
@ -1,7 +1,7 @@
|
||||
#![cfg(all(feature = "buffer", feature = "limit", feature = "retry"))]
|
||||
mod support;
|
||||
use futures_util::{future::Ready, pin_mut};
|
||||
use std::time::Duration;
|
||||
use futures_util::pin_mut;
|
||||
use std::{future::Ready, time::Duration};
|
||||
use tower::builder::ServiceBuilder;
|
||||
use tower::retry::Policy;
|
||||
use tower::util::ServiceExt;
|
||||
|
@ -1,8 +1,8 @@
|
||||
#![cfg(feature = "filter")]
|
||||
#[path = "../support.rs"]
|
||||
mod support;
|
||||
use futures_util::{future::poll_fn, pin_mut};
|
||||
use std::future::Future;
|
||||
use futures_util::future::pin_mut;
|
||||
use std::future::{poll_fn, Future};
|
||||
use tower::filter::{error::Error, AsyncFilter};
|
||||
use tower_service::Service;
|
||||
use tower_test::{assert_request_eq, mock};
|
||||
|
@ -1,5 +1,4 @@
|
||||
#[path = "../support.rs"]
|
||||
mod support;
|
||||
use crate::support;
|
||||
use tokio_test::{assert_pending, assert_ready, assert_ready_ok};
|
||||
use tower::limit::concurrency::ConcurrencyLimitLayer;
|
||||
use tower_test::{assert_request_eq, mock};
|
||||
|
@ -2,7 +2,8 @@
|
||||
#[path = "../support.rs"]
|
||||
mod support;
|
||||
|
||||
use futures_util::future;
|
||||
use std::future;
|
||||
|
||||
use tokio_test::{assert_pending, assert_ready_err, assert_ready_ok, task};
|
||||
use tower::retry::Policy;
|
||||
use tower_test::{assert_request_eq, mock};
|
||||
|
@ -2,8 +2,10 @@
|
||||
#[path = "../support.rs"]
|
||||
mod support;
|
||||
|
||||
use futures_util::future::{ready, Ready};
|
||||
use std::task::{Context, Poll};
|
||||
use std::{
|
||||
future::{ready, Ready},
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use tower::steer::Steer;
|
||||
use tower_service::Service;
|
||||
|
||||
@ -35,9 +37,7 @@ async fn pick_correctly() {
|
||||
let srvs = vec![MyService(42, true), MyService(57, true)];
|
||||
let mut st = Steer::new(srvs, |_: &_, _: &[_]| 1);
|
||||
|
||||
futures_util::future::poll_fn(|cx| st.poll_ready(cx))
|
||||
.await
|
||||
.unwrap();
|
||||
std::future::poll_fn(|cx| st.poll_ready(cx)).await.unwrap();
|
||||
let r = st.call(String::from("foo")).await.unwrap();
|
||||
assert_eq!(r, 57);
|
||||
}
|
||||
@ -49,7 +49,7 @@ async fn pending_all_ready() {
|
||||
let srvs = vec![MyService(42, true), MyService(57, false)];
|
||||
let mut st = Steer::new(srvs, |_: &_, _: &[_]| 0);
|
||||
|
||||
let p = futures_util::poll!(futures_util::future::poll_fn(|cx| st.poll_ready(cx)));
|
||||
let p = futures_util::poll!(std::future::poll_fn(|cx| st.poll_ready(cx)));
|
||||
match p {
|
||||
Poll::Pending => (),
|
||||
_ => panic!(
|
||||
|
@ -1,11 +1,8 @@
|
||||
#![allow(dead_code)]
|
||||
|
||||
use futures::future;
|
||||
use std::fmt;
|
||||
use std::pin::Pin;
|
||||
use std::future;
|
||||
use std::task::{Context, Poll};
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_stream::Stream;
|
||||
use tower::Service;
|
||||
|
||||
pub(crate) fn trace_init() -> tracing::subscriber::DefaultGuard {
|
||||
@ -17,36 +14,6 @@ pub(crate) fn trace_init() -> tracing::subscriber::DefaultGuard {
|
||||
tracing::subscriber::set_default(subscriber)
|
||||
}
|
||||
|
||||
pin_project_lite::pin_project! {
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct IntoStream<S> {
|
||||
#[pin]
|
||||
inner: S
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> IntoStream<S> {
|
||||
pub fn new(inner: S) -> Self {
|
||||
Self { inner }
|
||||
}
|
||||
}
|
||||
|
||||
impl<I> Stream for IntoStream<mpsc::Receiver<I>> {
|
||||
type Item = I;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
self.project().inner.poll_recv(cx)
|
||||
}
|
||||
}
|
||||
|
||||
impl<I> Stream for IntoStream<mpsc::UnboundedReceiver<I>> {
|
||||
type Item = I;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
self.project().inner.poll_recv(cx)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct AssertSpanSvc {
|
||||
span: tracing::Span,
|
||||
|
@ -1,13 +1,11 @@
|
||||
use super::support;
|
||||
use futures_core::Stream;
|
||||
use futures_util::{
|
||||
future::{ready, Ready},
|
||||
pin_mut,
|
||||
};
|
||||
use futures_util::pin_mut;
|
||||
use std::fmt;
|
||||
use std::future::Future;
|
||||
use std::future::{ready, Future, Ready};
|
||||
use std::task::{Context, Poll};
|
||||
use std::{cell::Cell, rc::Rc};
|
||||
use tokio_stream::wrappers::UnboundedReceiverStream;
|
||||
use tokio_test::{assert_pending, assert_ready, task};
|
||||
use tower::util::ServiceExt;
|
||||
use tower_service::*;
|
||||
@ -53,7 +51,7 @@ fn ordered() {
|
||||
admit: admit.clone(),
|
||||
};
|
||||
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let ca = srv.call_all(support::IntoStream::new(rx));
|
||||
let ca = srv.call_all(UnboundedReceiverStream::new(rx));
|
||||
pin_mut!(ca);
|
||||
|
||||
assert_pending!(mock.enter(|cx, _| ca.as_mut().poll_next(cx)));
|
||||
@ -155,7 +153,7 @@ async fn pending() {
|
||||
let mut task = task::spawn(());
|
||||
|
||||
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let ca = mock.call_all(support::IntoStream::new(rx));
|
||||
let ca = mock.call_all(UnboundedReceiverStream::new(rx));
|
||||
pin_mut!(ca);
|
||||
|
||||
assert_pending!(task.enter(|cx, _| ca.as_mut().poll_next(cx)));
|
||||
|
@ -1,4 +1,4 @@
|
||||
use futures_util::future::ready;
|
||||
use std::future::ready;
|
||||
use tower::util::service_fn;
|
||||
use tower_service::Service;
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user