From fcef5928a21c52b26d0e8cc8f2b1e1036fd3995e Mon Sep 17 00:00:00 2001 From: tottoto Date: Sat, 5 Jul 2025 19:56:12 +0900 Subject: [PATCH] chore: Use tokio-stream UnboundedReceiverStream (#831) --- Cargo.toml | 2 +- tower/Cargo.toml | 1 - tower/tests/balance/main.rs | 3 ++- tower/tests/support.rs | 33 --------------------------------- tower/tests/util/call_all.rs | 5 +++-- 5 files changed, 6 insertions(+), 38 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 688c1871..86c6a220 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,7 @@ rand = "0.9" slab = "0.4.9" sync_wrapper = "1" tokio = "1.6.2" -tokio-stream = "0.1.0" +tokio-stream = "0.1.1" tokio-test = "0.4" tokio-util = { version = "0.7.0", default-features = false } tracing = { version = "0.1.2", default-features = false } diff --git a/tower/Cargo.toml b/tower/Cargo.toml index 178e94bc..1ffb5e4f 100644 --- a/tower/Cargo.toml +++ b/tower/Cargo.toml @@ -76,7 +76,6 @@ sync_wrapper = { workspace = true, optional = true } [dev-dependencies] futures = { workspace = true } hdrhistogram = { workspace = true } -pin-project-lite = { workspace = true } tokio = { workspace = true, features = ["macros", "sync", "test-util", "rt-multi-thread"] } tokio-stream = { workspace = true } tokio-test = { workspace = true } diff --git a/tower/tests/balance/main.rs b/tower/tests/balance/main.rs index 9675e3f1..66538eca 100644 --- a/tower/tests/balance/main.rs +++ b/tower/tests/balance/main.rs @@ -4,6 +4,7 @@ mod support; use std::future::Future; use std::task::{Context, Poll}; +use tokio_stream::wrappers::UnboundedReceiverStream; use tokio_test::{assert_pending, assert_ready, task}; use tower::balance::p2c::Balance; use tower::discover::Change; @@ -37,7 +38,7 @@ fn stress() { let _t = support::trace_init(); let mut task = task::spawn(()); let (tx, rx) = tokio::sync::mpsc::unbounded_channel::>(); - let mut cache = Balance::<_, Req>::new(support::IntoStream::new(rx)); + let mut cache = Balance::<_, Req>::new(UnboundedReceiverStream::new(rx)); let mut nready = 0; let mut services = slab::Slab::<(mock::Handle, bool)>::new(); diff --git a/tower/tests/support.rs b/tower/tests/support.rs index 9ff8d37a..e58716c6 100644 --- a/tower/tests/support.rs +++ b/tower/tests/support.rs @@ -2,10 +2,7 @@ use std::fmt; use std::future; -use std::pin::Pin; use std::task::{Context, Poll}; -use tokio::sync::mpsc; -use tokio_stream::Stream; use tower::Service; pub(crate) fn trace_init() -> tracing::subscriber::DefaultGuard { @@ -17,36 +14,6 @@ pub(crate) fn trace_init() -> tracing::subscriber::DefaultGuard { tracing::subscriber::set_default(subscriber) } -pin_project_lite::pin_project! { - #[derive(Clone, Debug)] - pub struct IntoStream { - #[pin] - inner: S - } -} - -impl IntoStream { - pub fn new(inner: S) -> Self { - Self { inner } - } -} - -impl Stream for IntoStream> { - type Item = I; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().inner.poll_recv(cx) - } -} - -impl Stream for IntoStream> { - type Item = I; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().inner.poll_recv(cx) - } -} - #[derive(Clone, Debug)] pub struct AssertSpanSvc { span: tracing::Span, diff --git a/tower/tests/util/call_all.rs b/tower/tests/util/call_all.rs index 7389ed7c..feff91f6 100644 --- a/tower/tests/util/call_all.rs +++ b/tower/tests/util/call_all.rs @@ -5,6 +5,7 @@ use std::fmt; use std::future::{ready, Future, Ready}; use std::task::{Context, Poll}; use std::{cell::Cell, rc::Rc}; +use tokio_stream::wrappers::UnboundedReceiverStream; use tokio_test::{assert_pending, assert_ready, task}; use tower::util::ServiceExt; use tower_service::*; @@ -50,7 +51,7 @@ fn ordered() { admit: admit.clone(), }; let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); - let ca = srv.call_all(support::IntoStream::new(rx)); + let ca = srv.call_all(UnboundedReceiverStream::new(rx)); pin_mut!(ca); assert_pending!(mock.enter(|cx, _| ca.as_mut().poll_next(cx))); @@ -152,7 +153,7 @@ async fn pending() { let mut task = task::spawn(()); let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); - let ca = mock.call_all(support::IntoStream::new(rx)); + let ca = mock.call_all(UnboundedReceiverStream::new(rx)); pin_mut!(ca); assert_pending!(task.enter(|cx, _| ca.as_mut().poll_next(cx)));