spawn_ready: propagate tracing spans (#557)

Currently, when using `SpawnReady`, the current `tracing` span is not
propagated to the spawned task when the inner service is not ready. This
means that any traces emitted by the inner service's `poll_ready` occur
in their own root span, rather than the span the `SpawnReady` service
was polled in.

This branch fixes this by propagating the current trace span to the
spawned task.

This means that "spawn-ready" now enables the "tracing" feature. In the
future, we may want to consider feature-flagging `tracing` separately
from the middleware implementation that contain tracing instrumentation,
but doing so would break traces if the feature flag isn't enabled. This
doesn't break API compatibility, but it *does* break functionality, so
we may not want to do that until the next breaking change release.

I also added tests for span propagation. I realized the same test could
easily be applied to `Buffer`, which also propagates `tracing` spans, to
guard against regressions, so I also added a test for buffer.

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
This commit is contained in:
Eliza Weisman 2021-02-10 15:01:58 -08:00 committed by GitHub
parent f90f518f9f
commit ec547b329b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 111 additions and 5 deletions

View File

@ -11,12 +11,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `tracing` is now only pulled in for the features that need it.
- **util**: Add `option_layer` to convert an `Option<Layer>` into a `Layer`. ([#555])
- **builder**: Add `ServiceBuilder::option_layer` to optionally add a layer. ([#555])
- **steer**: `Steer` now implements `Clone
- **spawn-ready**: SpawnReady now propagates the current `tracing` span to
spawned tasks ([#557])
- **steer**: `Steer` now implements `Clone`.
- **make**: Added `Shared` which lets you implement `MakeService` by cloning a
service.
[#542]: https://github.com/tower-rs/tower/pull/542
[#555]: https://github.com/tower-rs/tower/pull/555
[#557]: https://github.com/tower-rs/tower/pull/557
# 0.4.4 (January 20, 2021)
@ -36,7 +39,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
[#532]: https://github.com/tower-rs/tower/pull/532
[#535]: https://github.com/tower-rs/tower/pull/535
[#538]: https://github.com/tower-rs/tower/pull/538
>>>>>>> master
# 0.4.3 (January 13, 2021)

View File

@ -56,7 +56,7 @@ make = ["tokio/io-std", "futures-util"]
ready-cache = ["futures-util", "indexmap", "tokio/sync", "tracing"]
reconnect = ["make", "tokio/io-std", "tracing"]
retry = ["tokio/time"]
spawn-ready = ["futures-util", "tokio/sync", "tokio/rt", "util"]
spawn-ready = ["futures-util", "tokio/sync", "tokio/rt", "util", "tracing"]
steer = ["futures-util"]
timeout = ["tokio/time"]
util = ["futures-util"]

View File

@ -8,6 +8,7 @@ use std::{
task::{Context, Poll},
};
use tower_service::Service;
use tracing::Instrument;
/// Spawns tasks to drive an inner service to readiness.
///
@ -51,7 +52,8 @@ where
}
let svc = svc.take().expect("illegal state");
let rx = tokio::spawn(svc.ready_oneshot().map_err(Into::into));
let rx =
tokio::spawn(svc.ready_oneshot().map_err(Into::into).in_current_span());
Inner::Future(rx)
}
Inner::Future(ref mut fut) => {

View File

@ -346,6 +346,25 @@ async fn wakes_pending_waiters_on_failure() {
);
}
#[tokio::test(flavor = "current_thread")]
async fn propagates_trace_spans() {
use tower::{util::ServiceExt, Service};
use tracing::Instrument;
let _t = support::trace_init();
let span = tracing::info_span!("my_span");
let service = support::AssertSpanSvc::new(span.clone());
let (mut service, worker) = Buffer::pair(service, 5);
let worker = tokio::spawn(worker);
let result = tokio::spawn(service.oneshot(()).instrument(span));
result.await.expect("service panicked").expect("failed");
worker.await.expect("worker panicked");
}
#[tokio::test(flavor = "current_thread")]
async fn doesnt_leak_permits() {
let _t = support::trace_init();

View File

@ -4,7 +4,7 @@ mod support;
use tokio::time;
use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok};
use tower::spawn_ready::SpawnReadyLayer;
use tower::spawn_ready::{SpawnReady, SpawnReadyLayer};
use tower_test::mock;
#[tokio::test(flavor = "current_thread")]
@ -43,3 +43,19 @@ async fn when_inner_fails() {
"foobar"
);
}
#[tokio::test(flavor = "current_thread")]
async fn propagates_trace_spans() {
use tower::{util::ServiceExt, Service};
use tracing::Instrument;
let _t = support::trace_init();
let span = tracing::info_span!("my_span");
let service = support::AssertSpanSvc::new(span.clone());
let mut service = SpawnReady::new(service);
let result = tokio::spawn(service.oneshot(()).instrument(span));
result.await.expect("service panicked").expect("failed");
}

View File

@ -1,9 +1,12 @@
#![allow(dead_code)]
use futures::future;
use std::fmt;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::sync::mpsc;
use tokio_stream::Stream;
use tower::Service;
pub(crate) fn trace_init() -> tracing::subscriber::DefaultGuard {
let subscriber = tracing_subscriber::fmt()
@ -33,3 +36,67 @@ impl<I> Stream for IntoStream<mpsc::UnboundedReceiver<I>> {
self.project().0.poll_recv(cx)
}
}
#[derive(Clone, Debug)]
pub struct AssertSpanSvc {
span: tracing::Span,
polled: bool,
}
pub struct AssertSpanError(String);
impl fmt::Debug for AssertSpanError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(&self.0, f)
}
}
impl fmt::Display for AssertSpanError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(&self.0, f)
}
}
impl std::error::Error for AssertSpanError {}
impl AssertSpanSvc {
pub fn new(span: tracing::Span) -> Self {
Self {
span,
polled: false,
}
}
fn check(&self, func: &str) -> Result<(), AssertSpanError> {
let current_span = tracing::Span::current();
tracing::debug!(?current_span, ?self.span, %func);
if current_span == self.span {
return Ok(());
}
Err(AssertSpanError(format!(
"{} called outside expected span\n expected: {:?}\n current: {:?}",
func, self.span, current_span
)))
}
}
impl Service<()> for AssertSpanSvc {
type Response = ();
type Error = AssertSpanError;
type Future = future::Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.polled {
return Poll::Ready(self.check("poll_ready"));
}
cx.waker().wake_by_ref();
self.polled = true;
Poll::Pending
}
fn call(&mut self, _: ()) -> Self::Future {
future::ready(self.check("call"))
}
}