From ec547b329b65e5e29e1bb880293c9d2c56073f50 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 10 Feb 2021 15:01:58 -0800 Subject: [PATCH] spawn_ready: propagate `tracing` spans (#557) Currently, when using `SpawnReady`, the current `tracing` span is not propagated to the spawned task when the inner service is not ready. This means that any traces emitted by the inner service's `poll_ready` occur in their own root span, rather than the span the `SpawnReady` service was polled in. This branch fixes this by propagating the current trace span to the spawned task. This means that "spawn-ready" now enables the "tracing" feature. In the future, we may want to consider feature-flagging `tracing` separately from the middleware implementation that contain tracing instrumentation, but doing so would break traces if the feature flag isn't enabled. This doesn't break API compatibility, but it *does* break functionality, so we may not want to do that until the next breaking change release. I also added tests for span propagation. I realized the same test could easily be applied to `Buffer`, which also propagates `tracing` spans, to guard against regressions, so I also added a test for buffer. Signed-off-by: Eliza Weisman --- tower/CHANGELOG.md | 6 ++- tower/Cargo.toml | 2 +- tower/src/spawn_ready/service.rs | 4 +- tower/tests/buffer/main.rs | 19 +++++++++ tower/tests/spawn_ready/main.rs | 18 ++++++++- tower/tests/support.rs | 67 ++++++++++++++++++++++++++++++++ 6 files changed, 111 insertions(+), 5 deletions(-) diff --git a/tower/CHANGELOG.md b/tower/CHANGELOG.md index 0f3ae813..a4c1205e 100644 --- a/tower/CHANGELOG.md +++ b/tower/CHANGELOG.md @@ -11,12 +11,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `tracing` is now only pulled in for the features that need it. - **util**: Add `option_layer` to convert an `Option` into a `Layer`. ([#555]) - **builder**: Add `ServiceBuilder::option_layer` to optionally add a layer. ([#555]) -- **steer**: `Steer` now implements `Clone +- **spawn-ready**: SpawnReady now propagates the current `tracing` span to + spawned tasks ([#557]) +- **steer**: `Steer` now implements `Clone`. - **make**: Added `Shared` which lets you implement `MakeService` by cloning a service. [#542]: https://github.com/tower-rs/tower/pull/542 [#555]: https://github.com/tower-rs/tower/pull/555 +[#557]: https://github.com/tower-rs/tower/pull/557 # 0.4.4 (January 20, 2021) @@ -36,7 +39,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 [#532]: https://github.com/tower-rs/tower/pull/532 [#535]: https://github.com/tower-rs/tower/pull/535 [#538]: https://github.com/tower-rs/tower/pull/538 ->>>>>>> master # 0.4.3 (January 13, 2021) diff --git a/tower/Cargo.toml b/tower/Cargo.toml index fa35c402..78097291 100644 --- a/tower/Cargo.toml +++ b/tower/Cargo.toml @@ -56,7 +56,7 @@ make = ["tokio/io-std", "futures-util"] ready-cache = ["futures-util", "indexmap", "tokio/sync", "tracing"] reconnect = ["make", "tokio/io-std", "tracing"] retry = ["tokio/time"] -spawn-ready = ["futures-util", "tokio/sync", "tokio/rt", "util"] +spawn-ready = ["futures-util", "tokio/sync", "tokio/rt", "util", "tracing"] steer = ["futures-util"] timeout = ["tokio/time"] util = ["futures-util"] diff --git a/tower/src/spawn_ready/service.rs b/tower/src/spawn_ready/service.rs index 71f84bc1..e9a3b93f 100644 --- a/tower/src/spawn_ready/service.rs +++ b/tower/src/spawn_ready/service.rs @@ -8,6 +8,7 @@ use std::{ task::{Context, Poll}, }; use tower_service::Service; +use tracing::Instrument; /// Spawns tasks to drive an inner service to readiness. /// @@ -51,7 +52,8 @@ where } let svc = svc.take().expect("illegal state"); - let rx = tokio::spawn(svc.ready_oneshot().map_err(Into::into)); + let rx = + tokio::spawn(svc.ready_oneshot().map_err(Into::into).in_current_span()); Inner::Future(rx) } Inner::Future(ref mut fut) => { diff --git a/tower/tests/buffer/main.rs b/tower/tests/buffer/main.rs index 3bb12daf..805a346d 100644 --- a/tower/tests/buffer/main.rs +++ b/tower/tests/buffer/main.rs @@ -346,6 +346,25 @@ async fn wakes_pending_waiters_on_failure() { ); } +#[tokio::test(flavor = "current_thread")] +async fn propagates_trace_spans() { + use tower::{util::ServiceExt, Service}; + use tracing::Instrument; + + let _t = support::trace_init(); + + let span = tracing::info_span!("my_span"); + + let service = support::AssertSpanSvc::new(span.clone()); + let (mut service, worker) = Buffer::pair(service, 5); + let worker = tokio::spawn(worker); + + let result = tokio::spawn(service.oneshot(()).instrument(span)); + + result.await.expect("service panicked").expect("failed"); + worker.await.expect("worker panicked"); +} + #[tokio::test(flavor = "current_thread")] async fn doesnt_leak_permits() { let _t = support::trace_init(); diff --git a/tower/tests/spawn_ready/main.rs b/tower/tests/spawn_ready/main.rs index d348e363..157bb98c 100644 --- a/tower/tests/spawn_ready/main.rs +++ b/tower/tests/spawn_ready/main.rs @@ -4,7 +4,7 @@ mod support; use tokio::time; use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok}; -use tower::spawn_ready::SpawnReadyLayer; +use tower::spawn_ready::{SpawnReady, SpawnReadyLayer}; use tower_test::mock; #[tokio::test(flavor = "current_thread")] @@ -43,3 +43,19 @@ async fn when_inner_fails() { "foobar" ); } + +#[tokio::test(flavor = "current_thread")] +async fn propagates_trace_spans() { + use tower::{util::ServiceExt, Service}; + use tracing::Instrument; + + let _t = support::trace_init(); + + let span = tracing::info_span!("my_span"); + + let service = support::AssertSpanSvc::new(span.clone()); + let mut service = SpawnReady::new(service); + let result = tokio::spawn(service.oneshot(()).instrument(span)); + + result.await.expect("service panicked").expect("failed"); +} diff --git a/tower/tests/support.rs b/tower/tests/support.rs index 6a036ec9..150617c2 100644 --- a/tower/tests/support.rs +++ b/tower/tests/support.rs @@ -1,9 +1,12 @@ #![allow(dead_code)] +use futures::future; +use std::fmt; use std::pin::Pin; use std::task::{Context, Poll}; use tokio::sync::mpsc; use tokio_stream::Stream; +use tower::Service; pub(crate) fn trace_init() -> tracing::subscriber::DefaultGuard { let subscriber = tracing_subscriber::fmt() @@ -33,3 +36,67 @@ impl Stream for IntoStream> { self.project().0.poll_recv(cx) } } + +#[derive(Clone, Debug)] +pub struct AssertSpanSvc { + span: tracing::Span, + polled: bool, +} + +pub struct AssertSpanError(String); + +impl fmt::Debug for AssertSpanError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(&self.0, f) + } +} + +impl fmt::Display for AssertSpanError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(&self.0, f) + } +} + +impl std::error::Error for AssertSpanError {} + +impl AssertSpanSvc { + pub fn new(span: tracing::Span) -> Self { + Self { + span, + polled: false, + } + } + + fn check(&self, func: &str) -> Result<(), AssertSpanError> { + let current_span = tracing::Span::current(); + tracing::debug!(?current_span, ?self.span, %func); + if current_span == self.span { + return Ok(()); + } + + Err(AssertSpanError(format!( + "{} called outside expected span\n expected: {:?}\n current: {:?}", + func, self.span, current_span + ))) + } +} + +impl Service<()> for AssertSpanSvc { + type Response = (); + type Error = AssertSpanError; + type Future = future::Ready>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + if self.polled { + return Poll::Ready(self.check("poll_ready")); + } + + cx.waker().wake_by_ref(); + self.polled = true; + Poll::Pending + } + + fn call(&mut self, _: ()) -> Self::Future { + future::ready(self.check("call")) + } +}