Tower services are now instrumentable

This commit is contained in:
Eliza Weisman 2018-09-21 15:11:42 -07:00
parent 13e3260439
commit deb5f5e1ac
5 changed files with 87 additions and 15 deletions

View File

@ -2,5 +2,6 @@
members = [
"tokio-trace",
"tokio-trace-tower",
"tokio-trace-slog"
]

View File

@ -0,0 +1,10 @@
[package]
name = "tokio-trace-tower"
version = "0.1.0"
authors = ["Eliza Weisman <eliza@buoyant.io>"]
edition = "2018"
[dependencies]
tokio-trace = { path = "../tokio-trace" }
futures = "0.1"
tower-service = "https://github.com/tower-rs/tower.git"

View File

@ -0,0 +1,50 @@
extern crate tower_service;
#[macro_use]
extern crate tokio_trace;
extern crate futures;
use tokio_trace::instrument::{Instrumented, Instrument};
use tower_service::Service;
use std::fmt;
#[derive(Clone, Debug)]
pub struct InstrumentedService<T> {
inner: T,
span: tokio_trace::Span,
}
pub trait InstrumentableService: Service + Sized {
fn instrument(self, span: tokio_trace::Span) -> InstrumentedService<Self> {
InstrumentedService {
inner: self,
span,
}
}
}
impl<T: Service> Service for InstrumentedService<T>
where
T::Request: fmt::Debug + Clone + Send + Sync + 'static,
{
type Request = T::Request;
type Response = T::Response;
type Future = Instrumented<T::Future>;
type Error = T::Error;
fn poll_ready(&mut self) -> futures::Poll<(), Self::Error> {
let span = &self.span;
let inner = &mut self.inner;
span.enter(|| { inner.poll_ready() })
}
fn call(&mut self, request: Self::Request) -> Self::Future {
let span = &self.span;
let inner = &mut self.inner;
span.enter(|| {
let request_span = span!("request", request = request.clone());
request_span.clone().enter(move || {
inner.call(request).instrument(request_span)
})
})
}
}

View File

@ -1,12 +1,12 @@
use super::Span;
use futures::{Future, Sink, Stream, Poll};
use futures::{Future, Sink, Stream, Poll, StartSend};
// TODO: seal?
pub trait Instrument: Sized {
fn instrument(self, span: Span) -> Instrumented<Self> {
Instrumented {
inner: self,
span
span,
}
}
}
@ -17,33 +17,35 @@ pub struct Instrumented<T> {
span: Span,
}
impl<T: Future + Sized> Instrument for T {}
impl<T: Stream + Sized> Instrument for T {}
impl<T: Sink + Sized> Instrument for T {}
impl<T: Sized> Instrument for T {}
impl<T: Future> Future for Instrument<T> {
impl<T: Future> Future for Instrumented<T> {
type Item = T::Item;
type Error = T::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.span.enter(|| {
self.inner.poll()
let span = &self.span;
let inner = &mut self.inner;
span.enter(move || {
inner.poll()
})
}
}
impl<T: Stream> Stream for Instrument<T> {
impl<T: Stream> Stream for Instrumented<T> {
type Item = T::Item;
type Error = T::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
self.span.enter(|| {
self.inner.poll()
let span = &self.span;
let inner = &mut self.inner;
span.enter(move || {
inner.poll()
})
}
}
impl<T: Sink> Sink for Instrument<T> {
impl<T: Sink> Sink for Instrumented<T> {
type SinkItem = T::SinkItem;
type SinkError = T::SinkError;
@ -51,10 +53,18 @@ impl<T: Sink> Sink for Instrument<T> {
&mut self,
item: Self::SinkItem
) -> StartSend<Self::SinkItem, Self::SinkError> {
self.span.enter(|| self.inner.start_send(item))
let span = &self.span;
let inner = &mut self.inner;
span.enter(move || {
inner.start_send(item)
})
}
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
self.span.enter(|| { self.inner.poll_complete() })
let span = &self.span;
let inner = &mut self.inner;
span.enter(move || {
inner.poll_complete()
})
}
}

View File

@ -87,6 +87,7 @@ thread_local! {
mod dedup;
mod dispatcher;
pub mod subscriber;
pub mod instrument;
pub use dispatcher::{Builder as DispatcherBuilder, Dispatcher};