Compare commits

...

27 Commits

Author SHA1 Message Date
ripytide
a1c277bc90
docs: correct rng pre-requisite comment (#835)
Fixes #826
2025-07-22 09:42:24 -04:00
ripytide
cd2bfa0f58
improve tower-layer docs (#834)
* improve tower-layer docs

* fix compile issues
2025-07-22 09:33:17 -04:00
cppforliving
9e901d450d
service: Improve unsized types' support (#650)
… and fix missing ready call in example.
2025-07-09 20:53:33 +02:00
ripytide
d21cdbf044
Fix clippy lints (#832) 2025-07-05 14:47:08 +00:00
tottoto
fcef5928a2
chore: Use tokio-stream UnboundedReceiverStream (#831) 2025-07-05 06:56:12 -04:00
Paolo Barbolini
fe3156587c
Bump rand to v0.9 (#811) 2025-07-03 09:52:53 +02:00
Gigabuidl
50d839d3b0
docs: update dead link in util/rng.rs (#820) 2025-07-02 11:13:32 -04:00
Icemic
9754acb5dc
fix: Use minimal tokio features (#828) 2025-06-30 05:37:12 +00:00
tottoto
b79c0c7497
chore: Remove unused dependency (#822) 2025-06-18 00:38:41 +02:00
Jonas Platte
6a3ab07b4c
style: address clippy lints (#827) 2025-06-12 13:30:19 -04:00
Tait Hoyem
ec81e5797b
no-std compatiblity for underlying traits (#810) 2025-06-01 00:02:07 +02:00
tottoto
81658e65ad
chore: Replace type related to future with standard library (#805) 2025-04-29 13:14:11 -07:00
katelyn martin
abb375d08c
chore: add Buffer breaking change to changelog (#819)
in #635, some subtle breaking changes were made to how `Buffer` works.

this is documented in the description of that PR, here:

> I had to change some of the integration tests slightly as part of this
> change. This is because the buffer implementation using semaphore
> permits is _very subtly_ different from one using a bounded channel. In
> the `Semaphore`-based implementation, a semaphore permit is stored in
> the `Message` struct sent over the channel. This is so that the capacity
> is used as long as the message is in flight. However, when the worker
> task is processing a message that's been recieved from the channel,
> the permit is still not dropped. Essentially, the one message actively
> held by the worker task _also_ occupies one "slot" of capacity, so the
> actual channel capacity is one less than the value passed to the
> constructor, _once the first request has been sent to the worker_. The
> bounded MPSC changed this behavior so that capacity is only occupied
> while a request is actually in the channel, which broke some tests
> that relied on the old (and technically wrong) behavior.

bear particular attention to this:

> The bounded MPSC changed this behavior so that capacity is only
> occupied while a request is actually in the channel, which broke some
> tests that relied on the old (and technically wrong) behavior.

this is a change in behavior that might affect downstream callers.

this commit adds mention of these changes to the changelog, to help
consumers navigate the upgrade from tower 0.4 to 0.5.

Signed-off-by: katelyn martin <me+cratelyn@katelyn.world>
2025-03-14 16:50:55 -04:00
katelyn martin
6c8d98b470
chore: add Buffer breaking changes to changelog (#818)
in #654, breaking changes were made to the `Buffer` type. this commit
adds mention of these breaking changes to the changelog, so that users
upgrading from 0.4 to 0.5 can have record of what changed, and why.
2025-03-12 18:34:40 -04:00
katelyn martin
fb646693bf
chore: note Budget breaking change in changelog (#817)
`Budget` is now a trait in the 0.5 release. this is a breaking change relative to the 0.4 release, where it was a concrete [struct](https://docs.rs/tower/0.4.13/tower/retry/budget/struct.Budget.html).

this commit updates the changelog to characterize this as a breaking change, rather than an additive change.
2025-03-11 17:04:17 -04:00
katelyn martin
ee149f0170
chore: fix broken links in changelog (#816)
this commit fixes some broken PR links in the changelog, related to the 0.5.2 release.
2025-03-11 16:52:01 -04:00
katelyn martin
aade4e34ff
chore: add breaking changes to changelog (#815)
in #637, breaking changes were made to the `Either<A, B>` service.

this commit adds documentation of these breaking changes to the changelog, so that users upgrading from 0.4 to 0.5 have record of what changed when, and why.
2025-03-11 16:42:38 -04:00
Carlos O'Ryan
954e4c7e8d
docs: bad documentation in ExponentialBackoffMaker (#809) 2024-12-27 11:42:55 +00:00
Jess Izen
34a6951a46
add ServiceBuilder::boxed_clone_sync helper (#804) 2024-12-20 18:34:40 -05:00
Sean McArthur
7dc533ef86 tower v0.5.2 2024-12-11 08:25:54 -05:00
Bhuwan Pandit
a09fd9742d
chore: fix dead code warning for 'Sealed' trait and 'sample_floyd2' func (#799) 2024-12-10 14:47:18 -05:00
Jess Izen
f57e31b0e6
Add util::BoxCloneSyncServiceLayer (#802)
cc #777
2024-12-10 14:31:45 -05:00
tim gretler
da24532017
Add util::BoxCloneSyncService (#777)
Closes #770
2024-12-10 13:38:18 -05:00
Elichai Turkel
6283f3aff1
Upgrade http and sync_wrapper dependencies (#788) 2024-11-19 11:49:39 -05:00
Jonas Platte
71551010ac
Prepare release of v0.5.1 (#791) 2024-08-21 19:36:33 -04:00
Arnaud Gourlay
b2c48b46a3
Bump dependency on tower-layer (#787) 2024-08-15 14:03:34 +00:00
Toby Lawrence
fec9e559e2
tower-layer: drop versions from dev dependencies (#782) 2024-08-13 12:48:56 -04:00
70 changed files with 603 additions and 267 deletions

View File

@ -41,7 +41,9 @@ jobs:
- name: "install Rust nightly"
uses: dtolnay/rust-toolchain@nightly
- name: Select minimal versions
run: cargo update -Z minimal-versions
run: |
cargo update -Z minimal-versions
cargo update -p lazy_static --precise 1.5.0
- name: Check
run: |
rustup default ${{ env.MSRV }}
@ -90,7 +92,9 @@ jobs:
- name: "install Rust nightly"
uses: dtolnay/rust-toolchain@nightly
- name: Select minimal versions
run: cargo update -Z minimal-versions
run: |
cargo update -Z minimal-versions
cargo update -p lazy_static --precise 1.5.0
- name: test
run: |
rustup default ${{ env.MSRV }}

View File

@ -12,16 +12,15 @@ futures = "0.3.22"
futures-core = "0.3.22"
futures-util = { version = "0.3.22", default-features = false }
hdrhistogram = { version = "7.0", default-features = false }
http = "0.2"
http = "1"
indexmap = "2.0.2"
lazy_static = "1.4.0"
pin-project-lite = "0.2.7"
quickcheck = "1"
rand = "0.8"
slab = "0.4"
sync_wrapper = "0.1.1"
rand = "0.9"
slab = "0.4.9"
sync_wrapper = "1"
tokio = "1.6.2"
tokio-stream = "0.1.0"
tokio-stream = "0.1.1"
tokio-test = "0.4"
tokio-util = { version = "0.7.0", default-features = false }
tracing = { version = "0.1.2", default-features = false }

View File

@ -35,6 +35,10 @@ Tower will keep a rolling MSRV (minimum supported Rust version) policy of **at
least** 6 months. When increasing the MSRV, the new Rust version must have been
released at least six months ago. The current MSRV is 1.64.0.
## `no_std`
`tower` itself is _not_ `no_std` compatible, but `tower-layer` and `tower-service` are.
## Getting Started
If you're brand new to Tower and want to start with the basics we recommend you

View File

@ -10,7 +10,7 @@ edition = "2018"
tower = { version = "0.4", path = "../tower", features = ["full"] }
tower-service = "0.3"
tokio = { version = "1.0", features = ["full"] }
rand = "0.8"
rand = "0.9"
pin-project = "1.0"
futures = "0.3.22"
tracing = "0.1"

View File

@ -22,5 +22,5 @@ edition = "2018"
[dependencies]
[dev-dependencies]
tower-service = { version = "0.3.0", path = "../tower-service" }
tower = { version = "0.5.0", path = "../tower" }
tower-service = { path = "../tower-service" }
tower = { path = "../tower" }

View File

@ -30,6 +30,8 @@ reusable components that can be applied to very different kinds of services;
for example, it can be applied to services operating on different protocols,
and to both the client and server side of a network transaction.
`tower-layer` is `no_std` compatible.
## License
This project is licensed under the [MIT license](LICENSE).
@ -40,4 +42,4 @@ Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in Tower by you, shall be licensed as MIT, without any additional
terms or conditions.
[Tower]: https://crates.io/crates/tower
[Tower]: https://crates.io/crates/tower

View File

@ -1,5 +1,5 @@
use super::Layer;
use std::fmt;
use core::fmt;
/// A no-op middleware.
///
@ -7,21 +7,35 @@ use std::fmt;
/// service without modifying it.
///
/// [`Service`]: https://docs.rs/tower-service/latest/tower_service/trait.Service.html
///
/// # Examples
///
/// ```rust
/// use tower_layer::Identity;
/// use tower_layer::Layer;
///
/// let identity = Identity::new();
///
/// assert_eq!(identity.layer(42), 42);
/// ```
#[derive(Default, Clone)]
pub struct Identity {
_p: (),
}
impl Identity {
/// Create a new [`Identity`] value
/// Creates a new [`Identity`].
///
/// ```rust
/// use tower_layer::Identity;
///
/// let identity = Identity::new();
/// ```
pub const fn new() -> Identity {
Identity { _p: () }
}
}
/// Decorates a [`Service`], transforming either the request or the response.
///
/// [`Service`]: https://docs.rs/tower-service/latest/tower_service/trait.Service.html
impl<S> Layer<S> for Identity {
type Service = S;

View File

@ -1,22 +1,26 @@
use super::Layer;
use std::fmt;
use core::fmt;
/// Returns a new [`LayerFn`] that implements [`Layer`] by calling the
/// given function.
///
/// The [`Layer::layer`] method takes a type implementing [`Service`] and
/// The [`Layer::layer()`] method takes a type implementing [`Service`] and
/// returns a different type implementing [`Service`]. In many cases, this can
/// be implemented by a function or a closure. The [`LayerFn`] helper allows
/// writing simple [`Layer`] implementations without needing the boilerplate of
/// a new struct implementing [`Layer`].
///
/// # Example
/// [`Service`]: https://docs.rs/tower-service/latest/tower_service/trait.Service.html
/// [`Layer::layer()`]: crate::Layer::layer
///
/// # Examples
///
/// ```rust
/// # use tower::Service;
/// # use std::task::{Poll, Context};
/// # use core::task::{Poll, Context};
/// # use tower_layer::{Layer, layer_fn};
/// # use std::fmt;
/// # use std::convert::Infallible;
/// # use core::fmt;
/// # use core::convert::Infallible;
/// #
/// // A middleware that logs requests before forwarding them to another service
/// pub struct LogService<S> {
@ -61,9 +65,6 @@ use std::fmt;
/// // Wrap our service in a `LogService` so requests are logged.
/// let wrapped_service = log_layer.layer(uppercase_service);
/// ```
///
/// [`Service`]: https://docs.rs/tower-service/latest/tower_service/trait.Service.html
/// [`Layer::layer`]: crate::Layer::layer
pub fn layer_fn<T>(f: T) -> LayerFn<T> {
LayerFn { f }
}
@ -88,7 +89,7 @@ where
impl<F> fmt::Debug for LayerFn<F> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("LayerFn")
.field("f", &format_args!("{}", std::any::type_name::<F>()))
.field("f", &format_args!("{}", core::any::type_name::<F>()))
.finish()
}
}
@ -96,6 +97,7 @@ impl<F> fmt::Debug for LayerFn<F> {
#[cfg(test)]
mod tests {
use super::*;
use alloc::{format, string::ToString};
#[allow(dead_code)]
#[test]
@ -108,7 +110,7 @@ mod tests {
assert_eq!(
"LayerFn { f: tower_layer::layer_fn::tests::layer_fn_has_useful_debug_impl::{{closure}} }".to_string(),
format!("{:?}", layer),
format!("{layer:?}"),
);
}
}

View File

@ -16,6 +16,11 @@
//!
//! [`Service`]: https://docs.rs/tower/*/tower/trait.Service.html
#![no_std]
#[cfg(test)]
extern crate alloc;
mod identity;
mod layer_fn;
mod stack;
@ -41,9 +46,9 @@ pub use self::{
///
/// ```rust
/// # use tower_service::Service;
/// # use std::task::{Poll, Context};
/// # use core::task::{Poll, Context};
/// # use tower_layer::Layer;
/// # use std::fmt;
/// # use core::fmt;
///
/// pub struct LogLayer {
/// target: &'static str,
@ -100,7 +105,7 @@ pub trait Layer<S> {
fn layer(&self, inner: S) -> Self::Service;
}
impl<'a, T, S> Layer<S> for &'a T
impl<T, S> Layer<S> for &T
where
T: ?Sized + Layer<S>,
{

View File

@ -1,7 +1,22 @@
use super::Layer;
use std::fmt;
use core::fmt;
/// Two middlewares chained together.
/// Two [`Layer`]s chained together.
///
/// # Examples
///
/// ```rust
/// use tower_layer::{Stack, layer_fn, Layer};
///
/// let inner = layer_fn(|service| service+2);
/// let outer = layer_fn(|service| service*2);
///
/// let inner_outer_stack = Stack::new(inner, outer);
///
/// // (4 + 2) * 2 = 12
/// // (4 * 2) + 2 = 10
/// assert_eq!(inner_outer_stack.layer(4), 12);
/// ```
#[derive(Clone)]
pub struct Stack<Inner, Outer> {
inner: Inner,
@ -9,7 +24,15 @@ pub struct Stack<Inner, Outer> {
}
impl<Inner, Outer> Stack<Inner, Outer> {
/// Create a new `Stack`.
/// Creates a new [`Stack`].
///
/// # Examples
///
/// ```rust
/// use tower_layer::{Stack, Identity};
///
/// let stack = Stack::new(Identity::new(), Identity::new());
/// ```
pub const fn new(inner: Inner, outer: Outer) -> Self {
Stack { inner, outer }
}

View File

@ -45,6 +45,11 @@ middleware may take actions such as modify the request.
[`Service`]: https://docs.rs/tower-service/latest/tower_service/trait.Service.html
[Tower]: https://crates.io/crates/tower
## `no_std`
`tower-service` is `no_std` compatible, but it (currently) requires the `alloc` crate.
## License
This project is licensed under the [MIT license](LICENSE).

View File

@ -13,8 +13,16 @@
//! request / response clients and servers. It is simple but powerful and is
//! used as the foundation for the rest of Tower.
use std::future::Future;
use std::task::{Context, Poll};
#![no_std]
extern crate alloc;
use alloc::boxed::Box;
use core::future::Future;
use core::marker::Sized;
use core::result::Result;
use core::task::{Context, Poll};
/// An asynchronous function from a `Request` to a `Response`.
///
@ -91,13 +99,14 @@ use std::task::{Context, Poll};
/// As an example, here is how a Redis request would be issued:
///
/// ```rust,ignore
/// let client = redis::Client::new()
/// let mut client = redis::Client::new()
/// .connect("127.0.0.1:6379".parse().unwrap())
/// .unwrap();
///
/// ServiceExt::<Cmd>::ready(&mut client).await?;
///
/// let resp = client.call(Cmd::set("foo", "this is the value of foo")).await?;
///
/// // Wait for the future to resolve
/// println!("Redis response: {:?}", resp);
/// ```
///
@ -273,7 +282,7 @@ use std::task::{Context, Poll};
/// }
/// ```
///
/// You should instead use [`std::mem::replace`] to take the service that was ready:
/// You should instead use [`core::mem::replace`] to take the service that was ready:
///
/// ```rust
/// # use std::pin::Pin;
@ -355,9 +364,9 @@ pub trait Service<Request> {
fn call(&mut self, req: Request) -> Self::Future;
}
impl<'a, S, Request> Service<Request> for &'a mut S
impl<S, Request> Service<Request> for &mut S
where
S: Service<Request> + 'a,
S: Service<Request> + ?Sized,
{
type Response = S::Response;
type Error = S::Error;

View File

@ -20,7 +20,6 @@ categories = ["asynchronous", "network-programming"]
edition = "2018"
[dependencies]
futures-util = { workspace = true }
tokio = { workspace = true, features = ["sync"] }
tokio-test = { workspace = true }
tower-layer = { version = "0.3", path = "../tower-layer" }

View File

@ -1,14 +1,13 @@
//! Future types
use crate::mock::error::{self, Error};
use futures_util::ready;
use pin_project_lite::pin_project;
use tokio::sync::oneshot;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
task::{ready, Context, Poll},
};
pin_project! {

View File

@ -18,7 +18,6 @@ use std::{
future::Future,
sync::{Arc, Mutex},
task::{Context, Poll},
u64,
};
/// Spawn a layer onto a mock service.

View File

@ -5,6 +5,24 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
# 0.5.2
### Added
- **util**: Add `BoxCloneSyncService` which is a `Clone + Send + Sync` boxed `Service` ([#777])
- **util**: Add `BoxCloneSyncServiceLayer` which is a `Clone + Send + Sync` boxed `Layer` ([802])
[#777]: https://github.com/tower-rs/tower/pull/777
[#802]: https://github.com/tower-rs/tower/pull/802
# 0.5.1
### Fixed
- Fix minimum version of `tower-layer` dependency ([#787])
[#787]: https://github.com/tower-rs/tower/pull/787
# 0.5.0
### Fixed
@ -19,7 +37,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
increases the flexibility of the retry policy. To update, update your method signature to include `mut` for both parameters. ([#584])
- **retry**: **Breaking Change** Change Policy to accept &mut self ([#681])
- **retry**: Add generic backoff utilities ([#685])
- **retry**: Add Budget trait. This allows end-users to implement their own budget and bucket implementations. ([#703])
- **retry**: **Breaking Change** `Budget` is now a trait. This allows end-users to implement their own budget and bucket implementations. ([#703])
- **reconnect**: **Breaking Change** Remove unused generic parameter from `Reconnect::new` ([#755])
- **ready-cache**: Allow iteration over ready services ([#700])
- **discover**: Implement `Clone` for Change ([#701])
@ -28,6 +46,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- **filter**: Derive `Clone` for `AsyncFilterLayer` ([#731])
- **general**: Update IndexMap ([#741])
- **MSRV**: Increase MSRV to 1.63.0 ([#741])
- **util**: **Breaking Change** `Either::A` and `Either::B` have been renamed `Either::Left` and `Either::Right`, respectively. ([#637])
- **util**: **Breaking Change** `Either` now requires its two services to have the same error type. ([#637])
- **util**: **Breaking Change** `Either` no longer implemenmts `Future`. ([#637])
- **buffer**: **Breaking Change** `Buffer<S, Request>` is now generic over `Buffer<Request, S::Future>.` ([#654])
- **buffer**: **Breaking Change** `Buffer`'s capacity now correctly matches the specified size. Previously, the
capacity was subtly off-by-one, because a slot was held even while the worker task was processing a message. ([#635])
[#702]: https://github.com/tower-rs/tower/pull/702
[#652]: https://github.com/tower-rs/tower/pull/652
@ -42,6 +66,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
[#716]: https://github.com/tower-rs/tower/pull/716
[#731]: https://github.com/tower-rs/tower/pull/731
[#741]: https://github.com/tower-rs/tower/pull/741
[#637]: https://github.com/tower-rs/tower/pull/637
[#654]: https://github.com/tower-rs/tower/pull/654
[#635]: https://github.com/tower-rs/tower/pull/635
# 0.4.12 (February 16, 2022)

View File

@ -1,18 +1,15 @@
[package]
name = "tower"
# When releasing to crates.io:
# - Update doc url
# - Cargo.toml
# - README.md
# - Update README.md
# - Update CHANGELOG.md.
# - Create "vX.X.X" git tag.
version = "0.5.0"
version = "0.5.2"
authors = ["Tower Maintainers <team@tower-rs.com>"]
license = "MIT"
readme = "README.md"
repository = "https://github.com/tower-rs/tower"
homepage = "https://github.com/tower-rs/tower"
documentation = "https://docs.rs/tower/0.5.0"
description = """
Tower is a library of modular and reusable components for building robust
clients and servers.
@ -23,10 +20,6 @@ edition = "2018"
rust-version = "1.64.0"
[features]
# Internal
__common = ["futures-core", "pin-project-lite"]
full = [
"balance",
"buffer",
@ -48,33 +41,33 @@ full = [
# FIXME: Use weak dependency once available (https://github.com/rust-lang/cargo/issues/8832)
log = ["tracing/log"]
balance = ["discover", "load", "ready-cache", "make", "slab", "util"]
buffer = ["__common", "tokio/sync", "tokio/rt", "tokio-util", "tracing"]
discover = ["__common"]
filter = ["__common", "futures-util"]
buffer = ["tokio/sync", "tokio/rt", "tokio-util", "tracing", "pin-project-lite"]
discover = ["futures-core", "pin-project-lite"]
filter = ["futures-util", "pin-project-lite"]
hedge = ["util", "filter", "futures-util", "hdrhistogram", "tokio/time", "tracing"]
limit = ["__common", "tokio/time", "tokio/sync", "tokio-util", "tracing"]
load = ["__common", "tokio/time", "tracing"]
load-shed = ["__common"]
make = ["futures-util", "pin-project-lite", "tokio/io-std"]
limit = ["tokio/time", "tokio/sync", "tokio-util", "tracing", "pin-project-lite"]
load = ["tokio/time", "tracing", "pin-project-lite"]
load-shed = ["pin-project-lite"]
make = ["pin-project-lite", "tokio"]
ready-cache = ["futures-core", "futures-util", "indexmap", "tokio/sync", "tracing", "pin-project-lite"]
reconnect = ["make", "tokio/io-std", "tracing"]
retry = ["__common", "tokio/time", "util"]
spawn-ready = ["__common", "futures-util", "tokio/sync", "tokio/rt", "util", "tracing"]
reconnect = ["make", "tracing"]
retry = ["tokio/time", "util"]
spawn-ready = ["futures-util", "tokio/sync", "tokio/rt", "util", "tracing"]
steer = []
timeout = ["pin-project-lite", "tokio/time"]
util = ["__common", "futures-util", "pin-project-lite", "sync_wrapper"]
util = ["futures-core", "futures-util", "pin-project-lite", "sync_wrapper"]
tokio-stream = [] # TODO: Remove this feature at the next breaking release.
[dependencies]
tower-layer = { version = "0.3.1", path = "../tower-layer" }
tower-service = { version = "0.3.1", path = "../tower-service" }
tower-layer = { version = "0.3.3", path = "../tower-layer" }
tower-service = { version = "0.3.3", path = "../tower-service" }
futures-core = { workspace = true, optional = true }
futures-util = { workspace = true, features = ["alloc"], optional = true }
hdrhistogram = { workspace = true, optional = true }
indexmap = { workspace = true, optional = true }
slab = { workspace = true, optional = true }
tokio = { workspace = true, features = ["sync"], optional = true }
tokio-stream = { workspace = true, optional = true }
tokio = { workspace = true, optional = true }
tokio-util = { workspace = true, optional = true }
tracing = { workspace = true, features = ["std"], optional = true }
pin-project-lite = { workspace = true, optional = true }
@ -83,7 +76,6 @@ sync_wrapper = { workspace = true, optional = true }
[dev-dependencies]
futures = { workspace = true }
hdrhistogram = { workspace = true }
pin-project-lite = { workspace = true }
tokio = { workspace = true, features = ["macros", "sync", "test-util", "rt-multi-thread"] }
tokio-stream = { workspace = true }
tokio-test = { workspace = true }
@ -91,7 +83,6 @@ tower-test = { version = "0.4", path = "../tower-test" }
tracing = { workspace = true, features = ["std"] }
tracing-subscriber = { workspace = true, features = ["fmt", "ansi"] }
http = { workspace = true }
lazy_static = { workspace = true }
rand = { workspace = true, features = ["small_rng"] }
quickcheck = { workspace = true }

View File

@ -141,14 +141,14 @@ To get started using all of Tower's optional middleware, add this to your
`Cargo.toml`:
```toml
tower = { version = "0.5.0", features = ["full"] }
tower = { version = "0.5.1", features = ["full"] }
```
Alternatively, you can only enable some features. For example, to enable
only the [`retry`] and [`timeout`][timeouts] middleware, write:
```toml
tower = { version = "0.5.0", features = ["retry", "timeout"] }
tower = { version = "0.5.1", features = ["retry", "timeout"] }
```
See [here][all_layers] for a complete list of all middleware provided by

View File

@ -4,7 +4,6 @@ use futures_core::{Stream, TryStream};
use futures_util::{stream, stream::StreamExt, stream::TryStreamExt};
use hdrhistogram::Histogram;
use pin_project_lite::pin_project;
use rand::{self, Rng};
use std::hash::Hash;
use std::time::Duration;
use std::{
@ -46,13 +45,13 @@ struct Summary {
async fn main() {
tracing::subscriber::set_global_default(tracing_subscriber::FmtSubscriber::default()).unwrap();
println!("REQUESTS={}", REQUESTS);
println!("CONCURRENCY={}", CONCURRENCY);
println!("ENDPOINT_CAPACITY={}", ENDPOINT_CAPACITY);
println!("REQUESTS={REQUESTS}");
println!("CONCURRENCY={CONCURRENCY}");
println!("ENDPOINT_CAPACITY={ENDPOINT_CAPACITY}");
print!("MAX_ENDPOINT_LATENCIES=[");
for max in &MAX_ENDPOINT_LATENCIES {
let l = max.as_secs() * 1_000 + u64::from(max.subsec_millis());
print!("{}ms, ", l);
print!("{l}ms, ");
}
println!("]");
@ -124,7 +123,7 @@ fn gen_disco() -> impl Discover<
let maxms = u64::from(latency.subsec_millis())
.saturating_add(latency.as_secs().saturating_mul(1_000));
let latency = Duration::from_millis(rand::thread_rng().gen_range(0..maxms));
let latency = Duration::from_millis(rand::random_range(0..maxms));
async move {
time::sleep_until(start + latency).await;
@ -149,7 +148,7 @@ where
<D::Service as Service<Req>>::Future: Send,
<D::Service as load::Load>::Metric: std::fmt::Debug,
{
println!("{}", name);
println!("{name}");
let requests = stream::repeat(Req).take(REQUESTS);
let service = ConcurrencyLimit::new(lb, CONCURRENCY);
@ -193,7 +192,7 @@ impl Summary {
}
for (i, c) in self.count_by_instance.iter().enumerate() {
let p = *c as f64 / total as f64 * 100.0;
println!(" [{:02}] {:>5.01}%", i, p);
println!(" [{i:02}] {p:>5.01}%");
}
println!(" wall {:4}s", self.start.elapsed().as_secs());

View File

@ -1,6 +1,5 @@
use super::Balance;
use crate::discover::Discover;
use futures_core::ready;
use pin_project_lite::pin_project;
use std::hash::Hash;
use std::marker::PhantomData;
@ -8,7 +7,7 @@ use std::{
fmt,
future::Future,
pin::Pin,
task::{Context, Poll},
task::{ready, Context, Poll},
};
use tower_service::Service;

View File

@ -3,14 +3,13 @@ use crate::discover::{Change, Discover};
use crate::load::Load;
use crate::ready_cache::{error::Failed, ReadyCache};
use crate::util::rng::{sample_floyd2, HasherRng, Rng};
use futures_core::ready;
use futures_util::future::{self, TryFutureExt};
use std::hash::Hash;
use std::marker::PhantomData;
use std::{
fmt,
pin::Pin,
task::{Context, Poll},
task::{ready, Context, Poll},
};
use tower_service::Service;
use tracing::{debug, trace};

View File

@ -3,12 +3,11 @@
//! [`Buffer`]: crate::buffer::Buffer
use super::{error::Closed, message};
use futures_core::ready;
use pin_project_lite::pin_project;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
task::{ready, Context, Poll},
};
pin_project! {

View File

@ -56,7 +56,7 @@ where
}
impl<Request> fmt::Debug for BufferLayer<Request> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("BufferLayer")
.field("bound", &self.bound)
.finish()
@ -65,10 +65,7 @@ impl<Request> fmt::Debug for BufferLayer<Request> {
impl<Request> Clone for BufferLayer<Request> {
fn clone(&self) -> Self {
Self {
bound: self.bound,
_p: PhantomData,
}
*self
}
}

View File

@ -2,12 +2,11 @@ use super::{
error::{Closed, ServiceError},
message::Message,
};
use futures_core::ready;
use std::sync::{Arc, Mutex};
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
task::{ready, Context, Poll},
};
use tokio::sync::mpsc;
use tower_service::Service;

View File

@ -635,7 +635,7 @@ impl<L> ServiceBuilder<L> {
/// impl Service<Request> for MyService {
/// type Response = Response;
/// type Error = Error;
/// type Future = futures_util::future::Ready<Result<Response, Error>>;
/// type Future = std::future::Ready<Result<Response, Error>>;
///
/// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
/// // ...
@ -789,6 +789,68 @@ impl<L> ServiceBuilder<L> {
{
self.layer(crate::util::BoxCloneService::layer())
}
/// This wraps the inner service with the [`Layer`] returned by [`BoxCloneSyncServiceLayer`].
///
/// This is similar to the [`boxed_clone`] method, but it requires that `Self` implement
/// [`Sync`], and the returned boxed service implements [`Sync`].
///
/// See [`BoxCloneSyncService`] for more details.
///
/// # Example
///
/// ```
/// use tower::{Service, ServiceBuilder, BoxError, util::BoxCloneSyncService};
/// use std::time::Duration;
/// #
/// # struct Request;
/// # struct Response;
/// # impl Response {
/// # fn new() -> Self { Self }
/// # }
///
/// let service: BoxCloneSyncService<Request, Response, BoxError> = ServiceBuilder::new()
/// .load_shed()
/// .concurrency_limit(64)
/// .timeout(Duration::from_secs(10))
/// .boxed_clone_sync()
/// .service_fn(|req: Request| async {
/// Ok::<_, BoxError>(Response::new())
/// });
/// # let service = assert_service(service);
///
/// // The boxed service can still be cloned.
/// service.clone();
/// # fn assert_service<S, R>(svc: S) -> S
/// # where S: Service<R> { svc }
/// ```
///
/// [`BoxCloneSyncServiceLayer`]: crate::util::BoxCloneSyncServiceLayer
/// [`BoxCloneSyncService`]: crate::util::BoxCloneSyncService
/// [`boxed_clone`]: Self::boxed_clone
#[cfg(feature = "util")]
pub fn boxed_clone_sync<S, R>(
self,
) -> ServiceBuilder<
Stack<
crate::util::BoxCloneSyncServiceLayer<
S,
R,
<L::Service as Service<R>>::Response,
<L::Service as Service<R>>::Error,
>,
Identity,
>,
>
where
L: Layer<S> + Send + Sync + 'static,
L::Service: Service<R> + Clone + Send + Sync + 'static,
<L::Service as Service<R>>::Future: Send + Sync + 'static,
{
let layer = self.into_inner();
ServiceBuilder::new().layer(crate::util::BoxCloneSyncServiceLayer::new(layer))
}
}
impl<L: fmt::Debug> fmt::Debug for ServiceBuilder<L> {

View File

@ -12,7 +12,8 @@
//! # Examples
//!
//! ```rust
//! use futures_util::{future::poll_fn, pin_mut};
//! use std::future::poll_fn;
//! use futures_util::pin_mut;
//! use tower::discover::{Change, Discover};
//! async fn services_monitor<D: Discover>(services: D) {
//! pin_mut!(services);

View File

@ -2,12 +2,11 @@
use super::AsyncPredicate;
use crate::BoxError;
use futures_core::ready;
use pin_project_lite::pin_project;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
task::{ready, Context, Poll},
};
use tower_service::Service;
@ -35,7 +34,7 @@ opaque_future! {
/// [`Filter`]: crate::filter::Filter
pub type ResponseFuture<R, F> =
futures_util::future::Either<
futures_util::future::Ready<Result<R, crate::BoxError>>,
std::future::Ready<Result<R, crate::BoxError>>,
futures_util::future::ErrInto<F, crate::BoxError>
>;
}

View File

@ -114,7 +114,7 @@ where
fn call(&mut self, request: Request) -> Self::Future {
ResponseFuture::new(match self.predicate.check(request) {
Ok(request) => Either::Right(self.inner.call(request).err_into()),
Err(e) => Either::Left(futures_util::future::ready(Err(e))),
Err(e) => Either::Left(std::future::ready(Err(e))),
})
}
}

View File

@ -1,10 +1,9 @@
use futures_util::ready;
use pin_project_lite::pin_project;
use std::time::Duration;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
task::{ready, Context, Poll},
};
use tower_service::Service;

View File

@ -1,10 +1,9 @@
use futures_util::ready;
use pin_project_lite::pin_project;
use std::time::Duration;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
task::{ready, Context, Poll},
};
use tokio::time::Instant;
use tower_service::Service;

View File

@ -4,11 +4,12 @@
#![warn(missing_debug_implementations, missing_docs, unreachable_pub)]
use crate::filter::AsyncFilter;
use futures_util::future;
use futures_util::future::Either;
use pin_project_lite::pin_project;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::{
future,
pin::Pin,
task::{Context, Poll},
};
@ -222,7 +223,7 @@ impl<P, Request> crate::filter::AsyncPredicate<Request> for PolicyPredicate<P>
where
P: Policy<Request>,
{
type Future = future::Either<
type Future = Either<
future::Ready<Result<Request, crate::BoxError>>,
future::Pending<Result<Request, crate::BoxError>>,
>;
@ -230,13 +231,13 @@ where
fn check(&mut self, request: Request) -> Self::Future {
if self.0.can_retry(&request) {
future::Either::Left(future::ready(Ok(request)))
Either::Left(future::ready(Ok(request)))
} else {
// If the hedge retry should not be issued, we simply want to wait
// for the result of the original request. Therefore we don't want
// to return an error here. Instead, we use future::pending to ensure
// that the original request wins the select.
future::Either::Right(future::pending())
Either::Right(future::pending())
}
}
}

View File

@ -219,6 +219,7 @@ pub use tower_layer::Layer;
pub use tower_service::Service;
#[allow(unreachable_pub)]
#[cfg(any(feature = "balance", feature = "discover", feature = "make"))]
mod sealed {
pub trait Sealed<T> {}
}

View File

@ -1,12 +1,11 @@
//! [`Future`] types
//!
//! [`Future`]: std::future::Future
use futures_core::ready;
use pin_project_lite::pin_project;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
task::{ready, Context, Poll},
};
use tokio::sync::OwnedSemaphorePermit;

View File

@ -3,10 +3,9 @@ use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tokio_util::sync::PollSemaphore;
use tower_service::Service;
use futures_core::ready;
use std::{
sync::Arc,
task::{Context, Poll},
task::{ready, Context, Poll},
};
/// Enforces a limit on the concurrent number of requests the underlying

View File

@ -1,9 +1,8 @@
use super::Rate;
use futures_core::ready;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
task::{ready, Context, Poll},
};
use tokio::time::{Instant, Sleep};
use tower_service::Service;

View File

@ -1,11 +1,10 @@
//! Application-specific request completion semantics.
use futures_core::ready;
use pin_project_lite::pin_project;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
task::{ready, Context, Poll},
};
/// Attaches `H`-typed completion tracker to `V` typed values.

View File

@ -3,9 +3,9 @@
#[cfg(feature = "discover")]
use crate::discover::{Change, Discover};
#[cfg(feature = "discover")]
use futures_core::{ready, Stream};
use futures_core::Stream;
#[cfg(feature = "discover")]
use std::pin::Pin;
use std::{pin::Pin, task::ready};
use super::Load;
use pin_project_lite::pin_project;

View File

@ -3,11 +3,11 @@
#[cfg(feature = "discover")]
use crate::discover::{Change, Discover};
#[cfg(feature = "discover")]
use futures_core::{ready, Stream};
use futures_core::Stream;
#[cfg(feature = "discover")]
use pin_project_lite::pin_project;
#[cfg(feature = "discover")]
use std::pin::Pin;
use std::{pin::Pin, task::ready};
use super::completion::{CompleteOnResponse, TrackCompletion, TrackCompletionFuture};
use super::Load;
@ -309,8 +309,7 @@ fn nanos(d: Duration) -> f64 {
#[cfg(test)]
mod tests {
use futures_util::future;
use std::time::Duration;
use std::{future, time::Duration};
use tokio::time;
use tokio_test::{assert_ready, assert_ready_ok, task};
@ -327,7 +326,7 @@ mod tests {
}
fn call(&mut self, (): ()) -> Self::Future {
future::ok(())
future::ready(Ok(()))
}
}
@ -399,7 +398,7 @@ mod tests {
assert_eq!(super::nanos(Duration::new(0, 123)), 123.0);
assert_eq!(super::nanos(Duration::new(1, 23)), 1_000_000_023.0);
assert_eq!(
super::nanos(Duration::new(::std::u64::MAX, 999_999_999)),
super::nanos(Duration::new(u64::MAX, 999_999_999)),
18446744074709553000.0
);
}

View File

@ -3,11 +3,11 @@
#[cfg(feature = "discover")]
use crate::discover::{Change, Discover};
#[cfg(feature = "discover")]
use futures_core::{ready, Stream};
use futures_core::Stream;
#[cfg(feature = "discover")]
use pin_project_lite::pin_project;
#[cfg(feature = "discover")]
use std::pin::Pin;
use std::{pin::Pin, task::ready};
use super::completion::{CompleteOnResponse, TrackCompletion, TrackCompletionFuture};
use super::Load;
@ -148,8 +148,10 @@ impl RefCount {
#[cfg(test)]
mod tests {
use super::*;
use futures_util::future;
use std::task::{Context, Poll};
use std::{
future,
task::{Context, Poll},
};
struct Svc;
impl Service<()> for Svc {
@ -162,7 +164,7 @@ mod tests {
}
fn call(&mut self, (): ()) -> Self::Future {
future::ok(())
future::ready(Ok(()))
}
}

View File

@ -3,9 +3,8 @@
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::task::{ready, Context, Poll};
use futures_core::ready;
use pin_project_lite::pin_project;
use super::error::Overloaded;

View File

@ -118,7 +118,7 @@ pub trait MakeService<Target, Request>: Sealed<(Target, Request)> {
/// # };
/// # }
/// ```
fn as_service(&mut self) -> AsService<Self, Request>
fn as_service(&mut self) -> AsService<'_, Self, Request>
where
Self: Sized,
{

View File

@ -12,10 +12,10 @@ use tower_service::Service;
/// # use std::task::{Context, Poll};
/// # use std::pin::Pin;
/// # use std::convert::Infallible;
/// use std::future::{Ready, ready};
/// use tower::make::{MakeService, Shared};
/// use tower::buffer::Buffer;
/// use tower::Service;
/// use futures::future::{Ready, ready};
///
/// // An example connection type
/// struct Connection {}
@ -93,13 +93,13 @@ where
}
fn call(&mut self, _target: T) -> Self::Future {
SharedFuture::new(futures_util::future::ready(Ok(self.service.clone())))
SharedFuture::new(std::future::ready(Ok(self.service.clone())))
}
}
opaque_future! {
/// Response future from [`Shared`] services.
pub type SharedFuture<S> = futures_util::future::Ready<Result<S, Infallible>>;
pub type SharedFuture<S> = std::future::Ready<Result<S, Infallible>>;
}
#[cfg(test)]
@ -107,7 +107,7 @@ mod tests {
use super::*;
use crate::make::MakeService;
use crate::service_fn;
use futures::future::poll_fn;
use std::future::poll_fn;
async fn echo<R>(req: R) -> Result<R, Infallible> {
Ok(req)

View File

@ -79,10 +79,10 @@ where
///
/// Returns a config validation error if:
/// - `min` > `max`
/// - `max` > 0
/// - `jitter` >= `0.0`
/// - `jitter` < `100.0`
/// - `jitter` is finite
/// - `max` == 0
/// - `jitter` < `0.0`
/// - `jitter` > `100.0`
/// - `jitter` is not finite
pub fn new(
min: time::Duration,
max: time::Duration,

View File

@ -27,9 +27,8 @@
//! # Examples
//!
//! ```rust
//! use std::sync::Arc;
//! use std::{future, sync::Arc};
//!
//! use futures_util::future;
//! use tower::retry::{budget::{Budget, TpsBudget}, Policy};
//!
//! type Req = String;

View File

@ -72,7 +72,7 @@ impl TpsBudget {
assert!(ttl <= Duration::from_secs(60));
assert!(retry_percent >= 0.0);
assert!(retry_percent <= 1000.0);
assert!(min_per_sec < ::std::i32::MAX as u32);
assert!(min_per_sec < i32::MAX as u32);
let (deposit_amount, withdraw_amount) = if retry_percent == 0.0 {
// If there is no percent, then you gain nothing from deposits.

View File

@ -1,11 +1,10 @@
//! Future types
use super::{Policy, Retry};
use futures_core::ready;
use pin_project_lite::pin_project;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::task::{ready, Context, Poll};
use tower_service::Service;
pin_project! {

View File

@ -6,7 +6,7 @@ use std::future::Future;
///
/// ```
/// use tower::retry::Policy;
/// use futures_util::future;
/// use std::future;
///
/// type Req = String;
/// type Res = String;
@ -91,4 +91,4 @@ pub trait Policy<Req, Res, E> {
// Ensure `Policy` is object safe
#[cfg(test)]
fn _obj_safe(_: Box<dyn Policy<(), (), (), Future = futures::future::Ready<()>>>) {}
fn _obj_safe(_: Box<dyn Policy<(), (), (), Future = std::future::Ready<()>>>) {}

View File

@ -1,11 +1,10 @@
use super::{future::ResponseFuture, SpawnReadyLayer};
use crate::{util::ServiceExt, BoxError};
use futures_core::ready;
use futures_util::future::TryFutureExt;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
task::{ready, Context, Poll},
};
use tower_service::Service;
use tracing::Instrument;

View File

@ -8,9 +8,8 @@
//! Here, `GET /` will be sent to the `root` service, while all other requests go to `not_found`.
//!
//! ```rust
//! # use std::task::{Context, Poll};
//! # use std::task::{Context, Poll, ready};
//! # use tower_service::Service;
//! # use futures_util::future::{ready, Ready, poll_fn};
//! # use tower::steer::Steer;
//! # use tower::service_fn;
//! # use tower::util::BoxService;

View File

@ -0,0 +1,129 @@
use std::{fmt, sync::Arc};
use tower_layer::{layer_fn, Layer};
use tower_service::Service;
use crate::util::BoxCloneSyncService;
/// A [`Clone`] + [`Send`] + [`Sync`] boxed [`Layer`].
///
/// [`BoxCloneSyncServiceLayer`] turns a layer into a trait object, allowing both the [`Layer`] itself
/// and the output [`Service`] to be dynamic, while having consistent types.
///
/// This [`Layer`] produces [`BoxCloneSyncService`] instances erasing the type of the
/// [`Service`] produced by the wrapped [`Layer`].
///
/// This is similar to [`BoxCloneServiceLayer`](super::BoxCloneServiceLayer) except the layer and resulting
/// service implements [`Sync`].
///
/// # Example
///
/// `BoxCloneSyncServiceLayer` can, for example, be useful to create layers dynamically that otherwise wouldn't have
/// the same types, when the underlying service must be clone and sync (for example, when building a Hyper connector).
/// In this example, we include a [`Timeout`] layer only if an environment variable is set. We can use
/// `BoxCloneSyncServiceLayer` to return a consistent type regardless of runtime configuration:
///
/// ```
/// use std::time::Duration;
/// use tower::{Service, ServiceBuilder, BoxError};
/// use tower::util::{BoxCloneSyncServiceLayer, BoxCloneSyncService};
///
/// #
/// # struct Request;
/// # struct Response;
/// # impl Response {
/// # fn new() -> Self { Self }
/// # }
///
/// fn common_layer<S, T>() -> BoxCloneSyncServiceLayer<S, T, S::Response, BoxError>
/// where
/// S: Service<T> + Clone + Send + Sync + 'static,
/// S::Future: Send + 'static,
/// S::Error: Into<BoxError> + 'static,
/// {
/// let builder = ServiceBuilder::new()
/// .concurrency_limit(100);
///
/// if std::env::var("SET_TIMEOUT").is_ok() {
/// let layer = builder
/// .timeout(Duration::from_secs(30))
/// .into_inner();
///
/// BoxCloneSyncServiceLayer::new(layer)
/// } else {
/// let layer = builder
/// .map_err(Into::into)
/// .into_inner();
///
/// BoxCloneSyncServiceLayer::new(layer)
/// }
/// }
///
/// // We can clone the layer (this is true of BoxLayer as well)
/// let boxed_clone_sync_layer = common_layer();
///
/// let cloned_sync_layer = boxed_clone_sync_layer.clone();
///
/// // Using the `BoxCloneSyncServiceLayer` we can create a `BoxCloneSyncService`
/// let service: BoxCloneSyncService<Request, Response, BoxError> = ServiceBuilder::new().layer(cloned_sync_layer)
/// .service_fn(|req: Request| async {
/// Ok::<_, BoxError>(Response::new())
/// });
///
/// # let service = assert_service(service);
///
/// // And we can still clone the service
/// let cloned_service = service.clone();
/// #
/// # fn assert_service<S, R>(svc: S) -> S
/// # where S: Service<R> { svc }
///
/// ```
///
/// [`Layer`]: tower_layer::Layer
/// [`Service`]: tower_service::Service
/// [`BoxService`]: super::BoxService
/// [`Timeout`]: crate::timeout
pub struct BoxCloneSyncServiceLayer<In, T, U, E> {
boxed: Arc<dyn Layer<In, Service = BoxCloneSyncService<T, U, E>> + Send + Sync + 'static>,
}
impl<In, T, U, E> BoxCloneSyncServiceLayer<In, T, U, E> {
/// Create a new [`BoxCloneSyncServiceLayer`].
pub fn new<L>(inner_layer: L) -> Self
where
L: Layer<In> + Send + Sync + 'static,
L::Service: Service<T, Response = U, Error = E> + Send + Sync + Clone + 'static,
<L::Service as Service<T>>::Future: Send + 'static,
{
let layer = layer_fn(move |inner: In| {
let out = inner_layer.layer(inner);
BoxCloneSyncService::new(out)
});
Self {
boxed: Arc::new(layer),
}
}
}
impl<In, T, U, E> Layer<In> for BoxCloneSyncServiceLayer<In, T, U, E> {
type Service = BoxCloneSyncService<T, U, E>;
fn layer(&self, inner: In) -> Self::Service {
self.boxed.layer(inner)
}
}
impl<In, T, U, E> Clone for BoxCloneSyncServiceLayer<In, T, U, E> {
fn clone(&self) -> Self {
Self {
boxed: Arc::clone(&self.boxed),
}
}
}
impl<In, T, U, E> fmt::Debug for BoxCloneSyncServiceLayer<In, T, U, E> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("BoxCloneSyncServiceLayer").finish()
}
}

View File

@ -1,9 +1,11 @@
mod layer;
mod layer_clone;
mod layer_clone_sync;
mod sync;
mod unsync;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::{
layer::BoxLayer, layer_clone::BoxCloneServiceLayer, sync::BoxService, unsync::UnsyncBoxService,
layer::BoxLayer, layer_clone::BoxCloneServiceLayer, layer_clone_sync::BoxCloneSyncServiceLayer,
sync::BoxService, unsync::UnsyncBoxService,
};

View File

@ -28,7 +28,7 @@ use std::{
/// # Examples
///
/// ```
/// use futures_util::future::ready;
/// use std::future::ready;
/// # use tower_service::Service;
/// # use tower::util::{BoxService, service_fn};
/// // Respond to requests using a closure, but closures cannot be named...

View File

@ -0,0 +1,101 @@
use super::ServiceExt;
use futures_util::future::BoxFuture;
use std::{
fmt,
task::{Context, Poll},
};
use tower_layer::{layer_fn, LayerFn};
use tower_service::Service;
/// A [`Clone`] + [`Send`] + [`Sync`] boxed [`Service`].
///
/// [`BoxCloneSyncService`] turns a service into a trait object, allowing the
/// response future type to be dynamic, and allowing the service to be cloned and shared.
///
/// This is similar to [`BoxCloneService`](super::BoxCloneService) except the resulting
/// service implements [`Sync`].
/// ```
pub struct BoxCloneSyncService<T, U, E>(
Box<
dyn CloneService<T, Response = U, Error = E, Future = BoxFuture<'static, Result<U, E>>>
+ Send
+ Sync,
>,
);
impl<T, U, E> BoxCloneSyncService<T, U, E> {
/// Create a new `BoxCloneSyncService`.
pub fn new<S>(inner: S) -> Self
where
S: Service<T, Response = U, Error = E> + Clone + Send + Sync + 'static,
S::Future: Send + 'static,
{
let inner = inner.map_future(|f| Box::pin(f) as _);
BoxCloneSyncService(Box::new(inner))
}
/// Returns a [`Layer`] for wrapping a [`Service`] in a [`BoxCloneSyncService`]
/// middleware.
///
/// [`Layer`]: crate::Layer
pub fn layer<S>() -> LayerFn<fn(S) -> Self>
where
S: Service<T, Response = U, Error = E> + Clone + Send + Sync + 'static,
S::Future: Send + 'static,
{
layer_fn(Self::new)
}
}
impl<T, U, E> Service<T> for BoxCloneSyncService<T, U, E> {
type Response = U;
type Error = E;
type Future = BoxFuture<'static, Result<U, E>>;
#[inline]
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), E>> {
self.0.poll_ready(cx)
}
#[inline]
fn call(&mut self, request: T) -> Self::Future {
self.0.call(request)
}
}
impl<T, U, E> Clone for BoxCloneSyncService<T, U, E> {
fn clone(&self) -> Self {
Self(self.0.clone_box())
}
}
trait CloneService<R>: Service<R> {
fn clone_box(
&self,
) -> Box<
dyn CloneService<R, Response = Self::Response, Error = Self::Error, Future = Self::Future>
+ Send
+ Sync,
>;
}
impl<R, T> CloneService<R> for T
where
T: Service<R> + Send + Sync + Clone + 'static,
{
fn clone_box(
&self,
) -> Box<
dyn CloneService<R, Response = T::Response, Error = T::Error, Future = T::Future>
+ Send
+ Sync,
> {
Box::new(self.clone())
}
}
impl<T, U, E> fmt::Debug for BoxCloneSyncService<T, U, E> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("BoxCloneSyncService").finish()
}
}

View File

@ -1,10 +1,10 @@
use futures_core::{ready, Stream};
use futures_core::Stream;
use pin_project_lite::pin_project;
use std::{
fmt,
future::Future,
pin::Pin,
task::{Context, Poll},
task::{ready, Context, Poll},
};
use tower_service::Service;

View File

@ -24,7 +24,7 @@ pin_project! {
/// # use std::error::Error;
/// # use std::rc::Rc;
/// #
/// use futures::future::{ready, Ready};
/// use std::future::{ready, Ready};
/// use futures::StreamExt;
/// use futures::channel::mpsc;
/// use tower_service::Service;

View File

@ -154,7 +154,7 @@ where
self.state = match &mut self.state {
State::Future(fut) => {
let fut = Pin::new(fut);
let svc = futures_core::ready!(fut.poll(cx)?);
let svc = std::task::ready!(fut.poll(cx)?);
State::Service(svc)
}
State::Service(svc) => return svc.poll_ready(cx),
@ -176,22 +176,24 @@ mod tests {
use super::*;
use crate::util::{future_service, ServiceExt};
use crate::Service;
use futures::future::{ready, Ready};
use std::convert::Infallible;
use std::{
convert::Infallible,
future::{ready, Ready},
};
#[tokio::test]
async fn pending_service_debug_impl() {
let mut pending_svc = future_service(ready(Ok(DebugService)));
assert_eq!(
format!("{:?}", pending_svc),
"FutureService { state: State::Future(<futures_util::future::ready::Ready<core::result::Result<tower::util::future_service::tests::DebugService, core::convert::Infallible>>>) }"
format!("{pending_svc:?}"),
"FutureService { state: State::Future(<core::future::ready::Ready<core::result::Result<tower::util::future_service::tests::DebugService, core::convert::Infallible>>>) }"
);
pending_svc.ready().await.unwrap();
assert_eq!(
format!("{:?}", pending_svc),
format!("{pending_svc:?}"),
"FutureService { state: State::Service(DebugService) }"
);
}

View File

@ -3,6 +3,7 @@
mod and_then;
mod boxed;
mod boxed_clone;
mod boxed_clone_sync;
mod call_all;
mod either;
@ -23,8 +24,11 @@ pub mod rng;
pub use self::{
and_then::{AndThen, AndThenLayer},
boxed::{BoxCloneServiceLayer, BoxLayer, BoxService, UnsyncBoxService},
boxed::{
BoxCloneServiceLayer, BoxCloneSyncServiceLayer, BoxLayer, BoxService, UnsyncBoxService,
},
boxed_clone::BoxCloneService,
boxed_clone_sync::BoxCloneSyncService,
either::Either,
future_service::{future_service, FutureService},
map_err::{MapErr, MapErrLayer},
@ -135,14 +139,14 @@ pub trait ServiceExt<Request>: tower_service::Service<Request> {
/// # impl Service<u32> for DatabaseService {
/// # type Response = Record;
/// # type Error = u8;
/// # type Future = futures_util::future::Ready<Result<Record, u8>>;
/// # type Future = std::future::Ready<Result<Record, u8>>;
/// #
/// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
/// # Poll::Ready(Ok(()))
/// # }
/// #
/// # fn call(&mut self, request: u32) -> Self::Future {
/// # futures_util::future::ready(Ok(Record { name: "Jack".into(), age: 32 }))
/// # std::future::ready(Ok(Record { name: "Jack".into(), age: 32 }))
/// # }
/// # }
/// #
@ -204,14 +208,14 @@ pub trait ServiceExt<Request>: tower_service::Service<Request> {
/// # impl Service<u32> for DatabaseService {
/// # type Response = Record;
/// # type Error = u8;
/// # type Future = futures_util::future::Ready<Result<Record, u8>>;
/// # type Future = std::future::Ready<Result<Record, u8>>;
/// #
/// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
/// # Poll::Ready(Ok(()))
/// # }
/// #
/// # fn call(&mut self, request: u32) -> Self::Future {
/// # futures_util::future::ready(Ok(Record { name: "Jack".into(), age: 32 }))
/// # std::future::ready(Ok(Record { name: "Jack".into(), age: 32 }))
/// # }
/// # }
/// #
@ -271,14 +275,14 @@ pub trait ServiceExt<Request>: tower_service::Service<Request> {
/// # impl Service<u32> for DatabaseService {
/// # type Response = String;
/// # type Error = Error;
/// # type Future = futures_util::future::Ready<Result<String, Error>>;
/// # type Future = std::future::Ready<Result<String, Error>>;
/// #
/// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
/// # Poll::Ready(Ok(()))
/// # }
/// #
/// # fn call(&mut self, request: u32) -> Self::Future {
/// # futures_util::future::ready(Ok(String::new()))
/// # std::future::ready(Ok(String::new()))
/// # }
/// # }
/// #
@ -366,14 +370,14 @@ pub trait ServiceExt<Request>: tower_service::Service<Request> {
/// # impl Service<u32> for DatabaseService {
/// # type Response = Vec<Record>;
/// # type Error = DbError;
/// # type Future = futures_util::future::Ready<Result<Vec<Record>, DbError>>;
/// # type Future = std::future::Ready<Result<Vec<Record>, DbError>>;
/// #
/// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
/// # Poll::Ready(Ok(()))
/// # }
/// #
/// # fn call(&mut self, request: u32) -> Self::Future {
/// # futures_util::future::ready(Ok(vec![Record { name: "Jack".into(), age: 32 }]))
/// # std::future::ready(Ok(vec![Record { name: "Jack".into(), age: 32 }]))
/// # }
/// # }
/// #
@ -426,14 +430,14 @@ pub trait ServiceExt<Request>: tower_service::Service<Request> {
/// # impl Service<u32> for DatabaseService {
/// # type Response = Record;
/// # type Error = DbError;
/// # type Future = futures_util::future::Ready<Result<Record, DbError>>;
/// # type Future = std::future::Ready<Result<Record, DbError>>;
/// #
/// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
/// # Poll::Ready(Ok(()))
/// # }
/// #
/// # fn call(&mut self, request: u32) -> Self::Future {
/// # futures_util::future::ready(Ok(Record { name: "Jack".into(), age: 32 }))
/// # std::future::ready(Ok(Record { name: "Jack".into(), age: 32 }))
/// # }
/// # }
/// #
@ -490,14 +494,14 @@ pub trait ServiceExt<Request>: tower_service::Service<Request> {
/// # impl Service<u32> for DatabaseService {
/// # type Response = String;
/// # type Error = u8;
/// # type Future = futures_util::future::Ready<Result<String, u8>>;
/// # type Future = std::future::Ready<Result<String, u8>>;
/// #
/// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
/// # Poll::Ready(Ok(()))
/// # }
/// #
/// # fn call(&mut self, request: u32) -> Self::Future {
/// # futures_util::future::ready(Ok(String::new()))
/// # std::future::ready(Ok(String::new()))
/// # }
/// # }
/// #
@ -561,14 +565,14 @@ pub trait ServiceExt<Request>: tower_service::Service<Request> {
/// # impl Service<String> for DatabaseService {
/// # type Response = String;
/// # type Error = u8;
/// # type Future = futures_util::future::Ready<Result<String, u8>>;
/// # type Future = std::future::Ready<Result<String, u8>>;
/// #
/// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
/// # Poll::Ready(Ok(()))
/// # }
/// #
/// # fn call(&mut self, request: String) -> Self::Future {
/// # futures_util::future::ready(Ok(String::new()))
/// # std::future::ready(Ok(String::new()))
/// # }
/// # }
/// #
@ -629,14 +633,14 @@ pub trait ServiceExt<Request>: tower_service::Service<Request> {
/// # impl Service<u32> for DatabaseService {
/// # type Response = String;
/// # type Error = DbError;
/// # type Future = futures_util::future::Ready<Result<String, DbError>>;
/// # type Future = std::future::Ready<Result<String, DbError>>;
/// #
/// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
/// # Poll::Ready(Ok(()))
/// # }
/// #
/// # fn call(&mut self, request: u32) -> Self::Future {
/// # futures_util::future::ready(Ok(String::new()))
/// # std::future::ready(Ok(String::new()))
/// # }
/// # }
/// #
@ -702,14 +706,14 @@ pub trait ServiceExt<Request>: tower_service::Service<Request> {
/// # impl Service<u32> for DatabaseService {
/// # type Response = String;
/// # type Error = DbError;
/// # type Future = futures_util::future::Ready<Result<String, DbError>>;
/// # type Future = std::future::Ready<Result<String, DbError>>;
/// #
/// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
/// # Poll::Ready(Ok(()))
/// # }
/// #
/// # fn call(&mut self, request: u32) -> Self::Future {
/// # futures_util::future::ready(Ok(String::new()))
/// # std::future::ready(Ok(String::new()))
/// # }
/// # }
/// #
@ -804,14 +808,14 @@ pub trait ServiceExt<Request>: tower_service::Service<Request> {
/// # impl Service<u32> for DatabaseService {
/// # type Response = Record;
/// # type Error = DbError;
/// # type Future = futures_util::future::Ready<Result<Record, DbError>>;
/// # type Future = std::future::Ready<Result<Record, DbError>>;
/// #
/// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
/// # Poll::Ready(Ok(()))
/// # }
/// #
/// # fn call(&mut self, request: u32) -> Self::Future {
/// # futures_util::future::ready(Ok(()))
/// # std::future::ready(Ok(()))
/// # }
/// # }
/// #
@ -890,14 +894,14 @@ pub trait ServiceExt<Request>: tower_service::Service<Request> {
/// # impl Service<u32> for DatabaseService {
/// # type Response = Record;
/// # type Error = DbError;
/// # type Future = futures_util::future::Ready<Result<Record, DbError>>;
/// # type Future = std::future::Ready<Result<Record, DbError>>;
/// #
/// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
/// # Poll::Ready(Ok(()))
/// # }
/// #
/// # fn call(&mut self, request: u32) -> Self::Future {
/// # futures_util::future::ready(Ok(()))
/// # std::future::ready(Ok(()))
/// # }
/// # }
/// #

View File

@ -1,10 +1,9 @@
use futures_core::ready;
use pin_project_lite::pin_project;
use std::{
fmt,
future::Future,
pin::Pin,
task::{Context, Poll},
task::{ready, Context, Poll},
};
use tower_service::Service;
@ -89,7 +88,7 @@ where
loop {
match this.state.as_mut().project() {
StateProj::NotReady { svc, req } => {
let _ = ready!(svc.poll_ready(cx))?;
ready!(svc.poll_ready(cx))?;
let f = svc.call(req.take().expect("already called"));
this.state.set(State::called(f));
}

View File

@ -1,10 +1,9 @@
use super::error;
use futures_core::ready;
use pin_project_lite::pin_project;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
task::{ready, Context, Poll},
};
pin_project! {

View File

@ -1,10 +1,9 @@
use std::{fmt, marker::PhantomData};
use futures_core::ready;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
task::{ready, Context, Poll},
};
use tower_service::Service;
@ -70,7 +69,7 @@ where
pub struct Ready<'a, T, Request>(ReadyOneshot<&'a mut T, Request>);
// Safety: This is safe for the same reason that the impl for ReadyOneshot is safe.
impl<'a, T, Request> Unpin for Ready<'a, T, Request> {}
impl<T, Request> Unpin for Ready<'_, T, Request> {}
impl<'a, T, Request> Ready<'a, T, Request>
where
@ -93,7 +92,7 @@ where
}
}
impl<'a, T, Request> fmt::Debug for Ready<'a, T, Request>
impl<T, Request> fmt::Debug for Ready<'_, T, Request>
where
T: fmt::Debug,
{

View File

@ -25,7 +25,7 @@ pub trait Rng {
/// Generate a random [`f64`] between `[0, 1)`.
fn next_f64(&mut self) -> f64 {
// Borrowed from:
// https://github.com/rust-random/rand/blob/master/src/distributions/float.rs#L106
// https://github.com/rust-random/rand/blob/master/src/distr/float.rs#L108
let float_size = std::mem::size_of::<f64>() as u32 * 8;
let precision = 52 + 1;
let scale = 1.0 / ((1u64 << precision) as f64);
@ -40,7 +40,7 @@ pub trait Rng {
///
/// # Panic
///
/// - If start < end this will panic in debug mode.
/// - If `range.start >= range.end` this will panic in debug mode.
fn next_range(&mut self, range: Range<u64>) -> u64 {
debug_assert!(
range.start < range.end,
@ -116,6 +116,7 @@ where
///
/// ref: This was borrowed and modified from the following Rand implementation
/// https://github.com/rust-random/rand/blob/b73640705d6714509f8ceccc49e8df996fa19f51/src/seq/index.rs#L375-L411
#[cfg(feature = "balance")]
pub(crate) fn sample_floyd2<R: Rng>(rng: &mut R, length: u64) -> [u64; 2] {
debug_assert!(2 <= length);
let aidx = rng.next_range(0..length - 1);
@ -131,20 +132,24 @@ mod tests {
quickcheck! {
fn next_f64(counter: u64) -> TestResult {
let mut rng = HasherRng::default();
rng.counter = counter;
let mut rng = HasherRng {
counter,
..HasherRng::default()
};
let n = rng.next_f64();
TestResult::from_bool(n < 1.0 && n >= 0.0)
TestResult::from_bool((0.0..1.0).contains(&n))
}
fn next_range(counter: u64, range: Range<u64>) -> TestResult {
if range.start >= range.end{
if range.start >= range.end{
return TestResult::discard();
}
let mut rng = HasherRng::default();
rng.counter = counter;
let mut rng = HasherRng {
counter,
..HasherRng::default()
};
let n = rng.next_range(range.clone());
@ -152,12 +157,14 @@ mod tests {
}
fn sample_floyd2(counter: u64, length: u64) -> TestResult {
if length < 2 || length > 256 {
if !(2..=256).contains(&length) {
return TestResult::discard();
}
let mut rng = HasherRng::default();
rng.counter = counter;
let mut rng = HasherRng {
counter,
..HasherRng::default()
};
let [a, b] = super::sample_floyd2(&mut rng, length);

View File

@ -4,6 +4,7 @@ mod support;
use std::future::Future;
use std::task::{Context, Poll};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_test::{assert_pending, assert_ready, task};
use tower::balance::p2c::Balance;
use tower::discover::Change;
@ -28,7 +29,7 @@ impl Service<Req> for Mock {
impl tower::load::Load for Mock {
type Metric = usize;
fn load(&self) -> Self::Metric {
rand::random()
rand::random_range(usize::MIN..=usize::MAX)
}
}
@ -37,15 +38,15 @@ fn stress() {
let _t = support::trace_init();
let mut task = task::spawn(());
let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<Result<_, &'static str>>();
let mut cache = Balance::<_, Req>::new(support::IntoStream::new(rx));
let mut cache = Balance::<_, Req>::new(UnboundedReceiverStream::new(rx));
let mut nready = 0;
let mut services = slab::Slab::<(mock::Handle<Req, Req>, bool)>::new();
let mut retired = Vec::<mock::Handle<Req, Req>>::new();
for _ in 0..100_000 {
for _ in 0..(rand::random::<u8>() % 8) {
for _ in 0..rand::random_range(0u8..8) {
if !services.is_empty() && rand::random() {
if nready == 0 || rand::random::<u8>() > u8::max_value() / 4 {
if nready == 0 || rand::random::<u8>() > u8::MAX / 4 {
// ready a service
// TODO: sometimes ready a removed service?
for (_, (handle, ready)) in &mut services {
@ -114,7 +115,7 @@ fn stress() {
} else {
// remove
while !services.is_empty() {
let k = rand::random::<usize>() % (services.iter().last().unwrap().0 + 1);
let k = rand::random_range(0..=services.iter().next_back().unwrap().0);
if services.contains(k) {
let (handle, ready) = services.remove(k);
if ready {
@ -129,7 +130,7 @@ fn stress() {
} else {
// fail a service
while !services.is_empty() {
let k = rand::random::<usize>() % (services.iter().last().unwrap().0 + 1);
let k = rand::random_range(0..=services.iter().next_back().unwrap().0);
if services.contains(k) {
let (mut handle, ready) = services.remove(k);
if ready {

View File

@ -1,7 +1,7 @@
#![cfg(all(feature = "buffer", feature = "limit", feature = "retry"))]
mod support;
use futures_util::{future::Ready, pin_mut};
use std::time::Duration;
use futures_util::pin_mut;
use std::{future::Ready, time::Duration};
use tower::builder::ServiceBuilder;
use tower::retry::Policy;
use tower::util::ServiceExt;

View File

@ -1,8 +1,8 @@
#![cfg(feature = "filter")]
#[path = "../support.rs"]
mod support;
use futures_util::{future::poll_fn, pin_mut};
use std::future::Future;
use futures_util::future::pin_mut;
use std::future::{poll_fn, Future};
use tower::filter::{error::Error, AsyncFilter};
use tower_service::Service;
use tower_test::{assert_request_eq, mock};

View File

@ -1,5 +1,4 @@
#[path = "../support.rs"]
mod support;
use crate::support;
use tokio_test::{assert_pending, assert_ready, assert_ready_ok};
use tower::limit::concurrency::ConcurrencyLimitLayer;
use tower_test::{assert_request_eq, mock};

View File

@ -2,7 +2,8 @@
#[path = "../support.rs"]
mod support;
use futures_util::future;
use std::future;
use tokio_test::{assert_pending, assert_ready_err, assert_ready_ok, task};
use tower::retry::Policy;
use tower_test::{assert_request_eq, mock};

View File

@ -2,8 +2,10 @@
#[path = "../support.rs"]
mod support;
use futures_util::future::{ready, Ready};
use std::task::{Context, Poll};
use std::{
future::{ready, Ready},
task::{Context, Poll},
};
use tower::steer::Steer;
use tower_service::Service;
@ -35,9 +37,7 @@ async fn pick_correctly() {
let srvs = vec![MyService(42, true), MyService(57, true)];
let mut st = Steer::new(srvs, |_: &_, _: &[_]| 1);
futures_util::future::poll_fn(|cx| st.poll_ready(cx))
.await
.unwrap();
std::future::poll_fn(|cx| st.poll_ready(cx)).await.unwrap();
let r = st.call(String::from("foo")).await.unwrap();
assert_eq!(r, 57);
}
@ -49,7 +49,7 @@ async fn pending_all_ready() {
let srvs = vec![MyService(42, true), MyService(57, false)];
let mut st = Steer::new(srvs, |_: &_, _: &[_]| 0);
let p = futures_util::poll!(futures_util::future::poll_fn(|cx| st.poll_ready(cx)));
let p = futures_util::poll!(std::future::poll_fn(|cx| st.poll_ready(cx)));
match p {
Poll::Pending => (),
_ => panic!(

View File

@ -1,11 +1,8 @@
#![allow(dead_code)]
use futures::future;
use std::fmt;
use std::pin::Pin;
use std::future;
use std::task::{Context, Poll};
use tokio::sync::mpsc;
use tokio_stream::Stream;
use tower::Service;
pub(crate) fn trace_init() -> tracing::subscriber::DefaultGuard {
@ -17,36 +14,6 @@ pub(crate) fn trace_init() -> tracing::subscriber::DefaultGuard {
tracing::subscriber::set_default(subscriber)
}
pin_project_lite::pin_project! {
#[derive(Clone, Debug)]
pub struct IntoStream<S> {
#[pin]
inner: S
}
}
impl<S> IntoStream<S> {
pub fn new(inner: S) -> Self {
Self { inner }
}
}
impl<I> Stream for IntoStream<mpsc::Receiver<I>> {
type Item = I;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().inner.poll_recv(cx)
}
}
impl<I> Stream for IntoStream<mpsc::UnboundedReceiver<I>> {
type Item = I;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().inner.poll_recv(cx)
}
}
#[derive(Clone, Debug)]
pub struct AssertSpanSvc {
span: tracing::Span,

View File

@ -1,13 +1,11 @@
use super::support;
use futures_core::Stream;
use futures_util::{
future::{ready, Ready},
pin_mut,
};
use futures_util::pin_mut;
use std::fmt;
use std::future::Future;
use std::future::{ready, Future, Ready};
use std::task::{Context, Poll};
use std::{cell::Cell, rc::Rc};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_test::{assert_pending, assert_ready, task};
use tower::util::ServiceExt;
use tower_service::*;
@ -53,7 +51,7 @@ fn ordered() {
admit: admit.clone(),
};
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let ca = srv.call_all(support::IntoStream::new(rx));
let ca = srv.call_all(UnboundedReceiverStream::new(rx));
pin_mut!(ca);
assert_pending!(mock.enter(|cx, _| ca.as_mut().poll_next(cx)));
@ -155,7 +153,7 @@ async fn pending() {
let mut task = task::spawn(());
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let ca = mock.call_all(support::IntoStream::new(rx));
let ca = mock.call_all(UnboundedReceiverStream::new(rx));
pin_mut!(ca);
assert_pending!(task.enter(|cx, _| ca.as_mut().poll_next(cx)));

View File

@ -1,4 +1,4 @@
use futures_util::future::ready;
use std::future::ready;
use tower::util::service_fn;
use tower_service::Service;