From deb5f5e1ac64a75ecab49adf27c6800eb7633f19 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Fri, 21 Sep 2018 15:11:42 -0700 Subject: [PATCH] Tower services are now instrumentable --- Cargo.toml | 1 + tokio-trace-tower/Cargo.toml | 10 +++++++ tokio-trace-tower/src/lib.rs | 50 +++++++++++++++++++++++++++++++++++ tokio-trace/src/instrument.rs | 40 +++++++++++++++++----------- tokio-trace/src/lib.rs | 1 + 5 files changed, 87 insertions(+), 15 deletions(-) create mode 100644 tokio-trace-tower/Cargo.toml create mode 100644 tokio-trace-tower/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index 8ac97904..5674d54a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,5 +2,6 @@ members = [ "tokio-trace", + "tokio-trace-tower", "tokio-trace-slog" ] diff --git a/tokio-trace-tower/Cargo.toml b/tokio-trace-tower/Cargo.toml new file mode 100644 index 00000000..4bbf3df0 --- /dev/null +++ b/tokio-trace-tower/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "tokio-trace-tower" +version = "0.1.0" +authors = ["Eliza Weisman "] +edition = "2018" + +[dependencies] +tokio-trace = { path = "../tokio-trace" } +futures = "0.1" +tower-service = "https://github.com/tower-rs/tower.git" diff --git a/tokio-trace-tower/src/lib.rs b/tokio-trace-tower/src/lib.rs new file mode 100644 index 00000000..865a413f --- /dev/null +++ b/tokio-trace-tower/src/lib.rs @@ -0,0 +1,50 @@ +extern crate tower_service; +#[macro_use] +extern crate tokio_trace; +extern crate futures; + +use tokio_trace::instrument::{Instrumented, Instrument}; +use tower_service::Service; +use std::fmt; + +#[derive(Clone, Debug)] +pub struct InstrumentedService { + inner: T, + span: tokio_trace::Span, +} + +pub trait InstrumentableService: Service + Sized { + fn instrument(self, span: tokio_trace::Span) -> InstrumentedService { + InstrumentedService { + inner: self, + span, + } + } +} + +impl Service for InstrumentedService +where + T::Request: fmt::Debug + Clone + Send + Sync + 'static, +{ + type Request = T::Request; + type Response = T::Response; + type Future = Instrumented; + type Error = T::Error; + + fn poll_ready(&mut self) -> futures::Poll<(), Self::Error> { + let span = &self.span; + let inner = &mut self.inner; + span.enter(|| { inner.poll_ready() }) + } + + fn call(&mut self, request: Self::Request) -> Self::Future { + let span = &self.span; + let inner = &mut self.inner; + span.enter(|| { + let request_span = span!("request", request = request.clone()); + request_span.clone().enter(move || { + inner.call(request).instrument(request_span) + }) + }) + } +} diff --git a/tokio-trace/src/instrument.rs b/tokio-trace/src/instrument.rs index e4254f48..ac0b202b 100644 --- a/tokio-trace/src/instrument.rs +++ b/tokio-trace/src/instrument.rs @@ -1,12 +1,12 @@ use super::Span; -use futures::{Future, Sink, Stream, Poll}; - +use futures::{Future, Sink, Stream, Poll, StartSend}; +// TODO: seal? pub trait Instrument: Sized { fn instrument(self, span: Span) -> Instrumented { Instrumented { inner: self, - span + span, } } } @@ -17,33 +17,35 @@ pub struct Instrumented { span: Span, } -impl Instrument for T {} -impl Instrument for T {} -impl Instrument for T {} +impl Instrument for T {} -impl Future for Instrument { +impl Future for Instrumented { type Item = T::Item; type Error = T::Error; fn poll(&mut self) -> Poll { - self.span.enter(|| { - self.inner.poll() + let span = &self.span; + let inner = &mut self.inner; + span.enter(move || { + inner.poll() }) } } -impl Stream for Instrument { +impl Stream for Instrumented { type Item = T::Item; type Error = T::Error; fn poll(&mut self) -> Poll, Self::Error> { - self.span.enter(|| { - self.inner.poll() + let span = &self.span; + let inner = &mut self.inner; + span.enter(move || { + inner.poll() }) } } -impl Sink for Instrument { +impl Sink for Instrumented { type SinkItem = T::SinkItem; type SinkError = T::SinkError; @@ -51,10 +53,18 @@ impl Sink for Instrument { &mut self, item: Self::SinkItem ) -> StartSend { - self.span.enter(|| self.inner.start_send(item)) + let span = &self.span; + let inner = &mut self.inner; + span.enter(move || { + inner.start_send(item) + }) } fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { - self.span.enter(|| { self.inner.poll_complete() }) + let span = &self.span; + let inner = &mut self.inner; + span.enter(move || { + inner.poll_complete() + }) } } diff --git a/tokio-trace/src/lib.rs b/tokio-trace/src/lib.rs index d760f430..5a1b135f 100644 --- a/tokio-trace/src/lib.rs +++ b/tokio-trace/src/lib.rs @@ -87,6 +87,7 @@ thread_local! { mod dedup; mod dispatcher; pub mod subscriber; +pub mod instrument; pub use dispatcher::{Builder as DispatcherBuilder, Dispatcher};