tower: Rewrite the tower support crate (#198)

## Motivation

The `tracing-tower` crate for compatibility with `tower` is currently
pretty half-baked and incomplete, and doesn't conver a majority of
use-cases.

## Solution

This branch completely rewrites the `tracing-tower` support crate,
hopefully making it more useful. 

We now provide a set of `request_span` middleware for instrumenting a
service so that each request passing through it is instrumented with its
own span, and a set of `service_span` metadata that instruments the
entire service with a single span. Additionally, there are now
implementations of `Layer` and `MakeService`, when the `tower-layer` and
`tower-util` crates are also in use. Finally, I've reworked the
`InstrumentableService` extension trait a bit to make it easier to
implement services in different ways, although the old behaviour of
`InstrumentableService::instrument` is unchanged (I've heard
@LucioFranco is using this in prod...).

Most of the functionality of the `tracing-tower-http` crate can now be
reimplemented using the new `tracing-tower` crate (and a feature-flagged
`http` dependency), so that crate should _probably_ now be deprecated.
I've reimplemented its example using `tracing-tower`.

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
This commit is contained in:
Eliza Weisman 2019-07-22 10:54:46 -07:00 committed by GitHub
parent 9d9ba343f1
commit d1ab9dc393
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 980 additions and 32 deletions

View File

@ -4,8 +4,29 @@ version = "0.1.0"
authors = ["Eliza Weisman <eliza@buoyant.io>"]
edition = "2018"
[features]
default = ["tower-layer", "tower-util", "http"]
[dependencies]
tracing = "0.1"
tracing-futures = { path = "../tracing-futures" }
futures = "0.1"
tower-service = "0.2"
tower-layer = { version = "0.1", optional = true }
tower-util = { version = "0.1", optional = true }
http = { version = "0.1", optional = true }
[dev-dependencies]
bytes = "0.4"
h2 = "=0.1.11"
tower-h2 = { git = "https://github.com/tower-rs/tower-h2.git" }
string = { git = "https://github.com/carllerche/string" }
tokio = "0.1"
tokio-current-thread = "0.1.1"
tokio-connect = { git = "https://github.com/carllerche/tokio-connect" }
tracing-subscriber = { path = "../tracing-subscriber" }
tracing-fmt = { path = "../tracing-fmt" }
tokio-io = "0.1"
ansi_term = "0.11"
humantime = "1.1.1"
env_logger = "0.5"

View File

@ -0,0 +1,181 @@
use bytes::Bytes;
use futures::*;
use h2::Reason;
use http::{Request, Response};
use std::net::SocketAddr;
use string::{String, TryFrom};
use tokio::net::TcpStream;
use tokio::runtime::{Runtime, TaskExecutor};
use tower_h2::client::Connect;
use tower_h2::{Body, RecvBody};
use tower_service::Service;
use tower_util::MakeService;
use tracing;
use tracing_futures::Instrument;
use tracing_tower::InstrumentableService;
pub struct Conn(SocketAddr);
fn main() {
// Set the default subscriber to record all traces emitted by this example
// and by the `tracing_tower` library's helpers.
let subscriber = tracing_fmt::FmtSubscriber::builder()
.with_filter(tracing_fmt::filter::EnvFilter::from(
"h2_client=trace,tracing_tower=trace",
))
.finish();
let _ = tracing::subscriber::set_global_default(subscriber);
let mut rt = Runtime::new().unwrap();
let executor = rt.executor();
let addr = "[::1]:8888".parse().unwrap();
impl Service<()> for Conn {
type Response = TcpStream;
type Error = ::std::io::Error;
type Future = Box<Future<Item = TcpStream, Error = ::std::io::Error> + Send>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(().into())
}
fn call(&mut self, _: ()) -> Self::Future {
tracing::debug!("connecting...");
let c = TcpStream::connect(&self.0)
.and_then(|tcp| {
tcp.set_nodelay(true)?;
tracing::info!("connected!");
Ok(tcp)
})
.map_err(|error| {
tracing::error!(%error);
error
});
Box::new(c)
}
}
let conn = Conn(addr).trace_requests(tracing::debug_span!("connect", remote = %addr));
let mut h2 = Connect::new(conn, Default::default(), executor.clone());
let req_span: fn(&http::Request<_>) -> tracing::Span = |req| {
let span = tracing::trace_span!(
"request",
req.method = ?req.method(),
req.path = ?req.uri().path(),
);
{
// TODO: this is a workaround because tracing-fmt doesn't honor
// overridden request parents.
let _enter = span.enter();
tracing::trace!(parent: &span, "sending request...");
}
span
};
let done = h2
.make_service(())
.map_err(|_| Reason::REFUSED_STREAM.into())
.and_then(move |h2| {
let h2 = h2.trace_requests(req_span);
Serial {
h2,
count: 10,
pending: None,
}
})
.map(|_| println!("done"))
.map_err(|e| println!("error: {:?}", e));
rt.spawn(done);
rt.shutdown_on_idle().wait().unwrap();
}
/// Avoids overflowing max concurrent streams
struct Serial {
count: usize,
h2: tracing_tower::request_span::Service<
tower_h2::client::Connection<TcpStream, TaskExecutor, tower_h2::NoBody>,
http::Request<tower_h2::NoBody>,
>,
pending: Option<Box<Future<Item = (), Error = tower_h2::client::Error> + Send>>,
}
impl Future for Serial {
type Item = ();
type Error = tower_h2::client::Error;
fn poll(&mut self) -> Poll<(), Self::Error> {
loop {
if let Some(mut fut) = self.pending.take() {
if fut.poll()?.is_not_ready() {
self.pending = Some(fut);
return Ok(Async::NotReady);
}
}
if self.count == 0 {
return Ok(Async::Ready(()));
}
self.count -= 1;
let mut fut = {
let span = tracing::debug_span!("serial", req.number = self.count);
let _enter = span.enter();
self.h2
.call(mkreq())
.and_then(move |rsp| read_response(rsp).map_err(Into::into))
.instrument(span.clone())
};
if fut.poll()?.is_not_ready() {
self.pending = Some(Box::new(fut));
return Ok(Async::NotReady);
}
}
}
}
fn mkreq() -> Request<tower_h2::NoBody> {
Request::builder()
.method("GET")
.uri("http://[::1]:8888/")
.version(http::Version::HTTP_2)
.body(tower_h2::NoBody)
.unwrap()
}
fn read_response(rsp: Response<RecvBody>) -> tracing_futures::Instrumented<ReadResponse> {
let span = tracing::trace_span!("response");
let f = {
let _enter = span.enter();
let (parts, body) = rsp.into_parts();
tracing::debug!(rsp.status = %parts.status);
ReadResponse { body }
};
f.instrument(span)
}
struct ReadResponse {
body: RecvBody,
}
impl Future for ReadResponse {
type Item = ();
type Error = tower_h2::client::Error;
fn poll(&mut self) -> Poll<(), Self::Error> {
loop {
match try_ready!(self.body.poll_data()) {
None => return Ok(Async::Ready(())),
Some(b) => {
let b: Bytes = b.into();
{
let s = String::try_from(b).expect("decode utf8 string");
tracing::trace!(rsp.body = &*s);
}
}
}
}
}
}

View File

@ -0,0 +1,150 @@
use bytes::{Bytes, IntoBuf};
use futures::*;
use http::Request;
use tokio::executor::DefaultExecutor;
use tokio::net::TcpListener;
use tower_h2::{Body, RecvBody, Server};
use tower_service::Service;
use tracing_futures::Instrument;
use tracing_tower::InstrumentMake;
type Response = http::Response<RspBody>;
struct RspBody(Option<Bytes>);
impl RspBody {
fn new(body: Bytes) -> Self {
RspBody(Some(body))
}
fn empty() -> Self {
RspBody(None)
}
}
impl Body for RspBody {
type Data = <Bytes as IntoBuf>::Buf;
type Error = h2::Error;
fn poll_data(&mut self) -> Poll<Option<Self::Data>, Self::Error> {
let data = self.0.take().and_then(|b| {
if b.is_empty() {
None
} else {
Some(b.into_buf())
}
});
Ok(Async::Ready(data))
}
fn poll_trailers(&mut self) -> Poll<Option<http::HeaderMap>, Self::Error> {
Ok(None.into())
}
}
const ROOT: &'static str = "/";
#[derive(Debug)]
struct Svc;
impl Service<Request<RecvBody>> for Svc {
type Response = Response;
type Error = h2::Error;
type Future = future::FutureResult<Response, Self::Error>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
}
fn call(&mut self, req: Request<RecvBody>) -> Self::Future {
tracing::trace!(message = "received request", request.headers = ?req.headers());
let mut rsp = http::Response::builder();
rsp.version(http::Version::HTTP_2);
let uri = req.uri();
let rsp = if uri.path() != ROOT {
let body = RspBody::empty();
tracing::warn!(rsp.error = %"unrecognized path", request.path = ?uri.path());
rsp.status(404).body(body).unwrap()
} else {
let body = RspBody::new("heyo!".into());
rsp.status(200).body(body).unwrap()
};
tracing::debug!(rsp.status = %rsp.status(), message = "sending response...");
future::ok(rsp)
}
}
#[derive(Debug)]
struct NewSvc;
impl tower_service::Service<()> for NewSvc {
type Response = Svc;
type Error = ::std::io::Error;
type Future = future::FutureResult<Svc, Self::Error>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
}
fn call(&mut self, _target: ()) -> Self::Future {
future::ok(Svc)
}
}
fn main() {
// Set the default subscriber to record all traces emitted by this example
// and by the `tracing_tower` library's helpers.
let subscriber = tracing_fmt::FmtSubscriber::builder()
.with_filter(tracing_fmt::filter::EnvFilter::from(
"h2_server=trace,tracing_tower=trace",
))
.finish();
let _ = tracing::subscriber::set_global_default(subscriber);
let addr = "[::1]:8888".parse().unwrap();
let bind = TcpListener::bind(&addr).expect("bind");
// Construct a span for the server task, annotated with the listening IP
// address and port.
let span = tracing::trace_span!("server", ip = %addr.ip(), port = addr.port());
let server = lazy(|| {
let executor = DefaultExecutor::current();
// Enrich the `MakeService` with a wrapper so that each request is
// traced with its own span.
let new_svc = NewSvc.with_traced_requests(tracing_tower::http::debug_request);
let h2 = Server::new(new_svc, Default::default(), executor);
tracing::info!("listening");
bind.incoming()
.fold(h2, |mut h2, sock| {
// Construct a new span for each accepted connection.
let addr = sock.peer_addr().expect("can't get addr");
let span = tracing::trace_span!("conn", ip = %addr.ip(), port = addr.port());
let _enter = span.enter();
tracing::debug!("accepted connection");
if let Err(e) = sock.set_nodelay(true) {
return Err(e);
}
let serve = h2
.serve(sock)
.map_err(|error| tracing::error!(message = "h2 error", %error))
.map(|_| {
tracing::trace!("finished serving connection");
})
.instrument(span.clone());
tokio::spawn(serve);
Ok(h2)
})
.map_err(|error| tracing::error!(message = "serve error", %error))
.map(|_| {})
})
.instrument(span);
tokio::run(server);
}

47
tracing-tower/src/http.rs Normal file
View File

@ -0,0 +1,47 @@
use http;
use tracing;
macro_rules! make_req_fns {
($($name:ident, $level:expr),+) => {
$(
#[inline]
pub fn $name<A>(req: &http::Request<A>) -> tracing::Span {
tracing::span!(
$level,
"request",
method = ?req.method(),
uri = ?req.uri(),
)
}
)+
}
}
make_req_fns! {
info_request, tracing::Level::INFO,
warn_request, tracing::Level::WARN,
error_request, tracing::Level::ERROR
}
#[inline]
pub fn debug_request<A>(req: &http::Request<A>) -> tracing::Span {
tracing::span!(
tracing::Level::DEBUG,
"request",
method = ?req.method(),
uri = ?req.uri(),
version = ?req.version(),
)
}
#[inline]
pub fn trace_request<A>(req: &http::Request<A>) -> tracing::Span {
tracing::span!(
tracing::Level::TRACE,
"request",
method = ?req.method(),
uri = ?req.uri(),
version = ?req.version(),
headers = ?req.headers(),
)
}

View File

@ -1,44 +1,96 @@
extern crate tower_service;
#[macro_use]
extern crate tracing;
extern crate futures;
extern crate tracing_futures;
use std::fmt;
use tower_service::Service;
use tracing::Level;
use tracing_futures::{Instrument, Instrumented};
#[derive(Clone, Debug)]
pub struct InstrumentedService<T> {
inner: T,
span: tracing::Span,
}
pub mod request_span;
pub mod service_span;
pub trait InstrumentableService<Request>: Service<Request> + Sized {
fn instrument(self, span: tracing::Span) -> InstrumentedService<Self> {
InstrumentedService { inner: self, span }
}
}
#[cfg(feature = "http")]
pub mod http;
impl<T: Service<Request>, Request> Service<Request> for InstrumentedService<T>
pub type InstrumentedService<S, R> = service_span::Service<request_span::Service<S, R>>;
pub trait InstrumentableService<Request>
where
// TODO: it would be nice to do more for HTTP services...
Request: fmt::Debug + Clone + Send + Sync + 'static,
Self: Service<Request> + Sized,
{
type Response = T::Response;
type Error = T::Error;
type Future = Instrumented<T::Future>;
fn poll_ready(&mut self) -> futures::Poll<(), Self::Error> {
let _enter = self.span.enter();
self.inner.poll_ready()
fn instrument<G>(self, svc_span: G) -> InstrumentedService<Self, Request>
where
G: GetSpan<Self>,
Request: fmt::Debug,
{
let req_span: fn(&Request) -> tracing::Span =
|request| tracing::span!(Level::TRACE, "request", ?request);
let svc_span = svc_span.span_for(&self);
self.trace_requests(req_span).trace_service(svc_span)
}
fn call(&mut self, req: Request) -> Self::Future {
// TODO: custom `Value` impls for `http` types would be nice...
let span = span!(parent: &self.span, Level::TRACE, "request", request = ?req);
let _enter = span.enter();
self.inner.call(req).instrument(span.clone())
fn trace_requests<G>(self, get_span: G) -> request_span::Service<Self, Request, G>
where
G: GetSpan<Request> + Clone,
{
request_span::Service::new(self, get_span)
}
fn trace_service<G>(self, get_span: G) -> service_span::Service<Self>
where
G: GetSpan<Self>,
{
let span = get_span.span_for(&self);
service_span::Service::new(self, span)
}
}
#[cfg(feature = "tower-util")]
pub trait InstrumentMake<T, R>
where
Self: tower_util::MakeService<T, R> + Sized,
{
fn with_traced_service<G>(self, get_span: G) -> service_span::MakeService<Self, T, R, G>
where
G: GetSpan<T>,
{
service_span::MakeService::new(self, get_span)
}
fn with_traced_requests<G>(self, get_span: G) -> request_span::MakeService<Self, R, G>
where
G: GetSpan<R> + Clone,
{
request_span::MakeService::new(self, get_span)
}
}
impl<S, R> InstrumentableService<R> for S where S: Service<R> + Sized {}
#[cfg(feature = "tower-util")]
impl<M, T, R> InstrumentMake<T, R> for M where M: tower_util::MakeService<T, R> {}
pub trait GetSpan<T>: crate::sealed::Sealed<T> {
fn span_for(&self, target: &T) -> tracing::Span;
}
impl<T, F> crate::sealed::Sealed<T> for F where F: Fn(&T) -> tracing::Span {}
impl<T, F> GetSpan<T> for F
where
F: Fn(&T) -> tracing::Span,
{
#[inline]
fn span_for(&self, target: &T) -> tracing::Span {
(self)(target)
}
}
impl<T> crate::sealed::Sealed<T> for tracing::Span {}
impl<T> GetSpan<T> for tracing::Span {
#[inline]
fn span_for(&self, _: &T) -> tracing::Span {
self.clone()
}
}
mod sealed {
pub trait Sealed<T = ()> {}
}

View File

@ -0,0 +1,261 @@
//! Middleware which instruments each request passing through a service with a new span.
use super::GetSpan;
use futures::{future::Future, Async, Poll};
use std::marker::PhantomData;
use tracing_futures::Instrument;
#[derive(Debug)]
pub struct Service<S, R, G = fn(&R) -> tracing::Span>
where
S: tower_service::Service<R>,
G: GetSpan<R>,
{
get_span: G,
inner: S,
_p: PhantomData<fn(R)>,
}
#[cfg(feature = "tower-layer")]
pub use self::layer::*;
#[cfg(feature = "tower-layer")]
mod layer {
use super::*;
#[derive(Debug)]
pub struct Layer<R, G = fn(&R) -> tracing::Span>
where
G: GetSpan<R> + Clone,
{
get_span: G,
_p: PhantomData<fn(R)>,
}
pub fn layer<R, G>(get_span: G) -> Layer<R, G>
where
G: GetSpan<R> + Clone,
{
Layer {
get_span,
_p: PhantomData,
}
}
// === impl Layer ===
impl<S, R, G> tower_layer::Layer<S> for Layer<R, G>
where
S: tower_service::Service<R>,
G: GetSpan<R> + Clone,
{
type Service = Service<S, R, G>;
fn layer(&self, service: S) -> Self::Service {
Service::new(service, self.get_span.clone())
}
}
impl<R, G> Clone for Layer<R, G>
where
G: GetSpan<R> + Clone,
{
fn clone(&self) -> Self {
Self {
get_span: self.get_span.clone(),
_p: PhantomData,
}
}
}
}
#[cfg(feature = "tower-util")]
pub use self::make::MakeService;
#[cfg(feature = "tower-util")]
pub mod make {
use super::*;
pub type MakeFuture<S, R, G> = MakeService<S, R, Option<G>>;
#[derive(Debug)]
pub struct MakeService<S, R, G = fn(&R) -> tracing::Span> {
get_span: G,
inner: S,
_p: PhantomData<fn(R)>,
}
#[cfg(feature = "tower-layer")]
#[derive(Debug)]
pub struct MakeLayer<R, T, G = fn(&R) -> tracing::Span>
where
G: GetSpan<R> + Clone,
{
get_span: G,
_p: PhantomData<fn(T, R)>,
}
#[cfg(feature = "tower-layer")]
pub fn layer<R, T, G>(get_span: G) -> MakeLayer<R, T, G>
where
G: GetSpan<R> + Clone,
{
MakeLayer {
get_span,
_p: PhantomData,
}
}
// === impl MakeLayer ===
#[cfg(feature = "tower-layer")]
impl<S, R, G, T> tower_layer::Layer<S> for MakeLayer<R, T, G>
where
S: tower_util::MakeService<T, R>,
G: GetSpan<R> + Clone,
{
type Service = MakeService<S, R, G>;
fn layer(&self, inner: S) -> Self::Service {
MakeService::new(inner, self.get_span.clone())
}
}
#[cfg(feature = "tower-layer")]
impl<R, T, G> Clone for MakeLayer<R, T, G>
where
G: GetSpan<R> + Clone,
{
fn clone(&self) -> Self {
Self {
get_span: self.get_span.clone(),
_p: PhantomData,
}
}
}
// === impl MakeService ===
impl<S, R, G, T> tower_service::Service<T> for MakeService<S, R, G>
where
S: tower_util::MakeService<T, R>,
G: GetSpan<R> + Clone,
{
type Response = Service<S::Service, R, G>;
type Error = S::MakeError;
type Future = MakeFuture<S::Future, R, G>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.inner.poll_ready()
}
fn call(&mut self, target: T) -> Self::Future {
let inner = self.inner.make_service(target);
let get_span = Some(self.get_span.clone());
MakeService {
get_span,
inner,
_p: PhantomData,
}
}
}
impl<S, R, G> MakeService<S, R, G>
where
G: GetSpan<R> + Clone,
{
pub fn new<T>(inner: S, get_span: G) -> Self
where
S: tower_util::MakeService<T, R>,
{
Self {
get_span,
inner,
_p: PhantomData,
}
}
}
impl<S, R, G> Clone for MakeService<S, R, G>
where
G: GetSpan<R> + Clone,
S: Clone,
{
fn clone(&self) -> Self {
Self {
get_span: self.get_span.clone(),
inner: self.inner.clone(),
_p: PhantomData,
}
}
}
impl<S, R, G> Future for MakeService<S, R, Option<G>>
where
S: Future,
S::Item: tower_service::Service<R>,
G: GetSpan<R> + Clone,
{
type Item = Service<S::Item, R, G>;
type Error = S::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let inner = futures::try_ready!(self.inner.poll());
let get_span = self.get_span.take().expect("polled after ready");
Ok(Async::Ready(Service {
inner,
get_span,
_p: PhantomData,
}))
}
}
}
// === impl Service ===
impl<S, R, G> tower_service::Service<R> for Service<S, R, G>
where
S: tower_service::Service<R>,
G: GetSpan<R> + Clone,
{
type Response = S::Response;
type Error = S::Error;
type Future = tracing_futures::Instrumented<S::Future>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.inner.poll_ready()
}
fn call(&mut self, request: R) -> Self::Future {
let span = self.get_span.span_for(&request);
let _enter = span.enter();
self.inner.call(request).instrument(span.clone())
}
}
impl<S, R, G> Clone for Service<S, R, G>
where
S: tower_service::Service<R> + Clone,
G: GetSpan<R> + Clone,
{
fn clone(&self) -> Self {
Service {
get_span: self.get_span.clone(),
inner: self.inner.clone(),
_p: PhantomData,
}
}
}
impl<S, R, G> Service<S, R, G>
where
S: tower_service::Service<R>,
G: GetSpan<R> + Clone,
{
pub fn new(inner: S, get_span: G) -> Self {
Service {
get_span,
inner,
_p: PhantomData,
}
}
}

View File

@ -0,0 +1,236 @@
//! Middleware which instruments a service with a span entered when that service
//! is called.
use crate::GetSpan;
use futures::{future::Future, Async, Poll};
use std::marker::PhantomData;
#[derive(Debug)]
pub struct Service<S> {
inner: S,
span: tracing::Span,
}
#[cfg(feature = "tower-layer")]
pub use self::layer::*;
#[cfg(feature = "tower-util")]
pub use self::make::MakeService;
#[cfg(feature = "tower-layer")]
mod layer {
use super::*;
#[derive(Debug)]
pub struct Layer<S, R, G = fn(&S) -> tracing::Span>
where
G: GetSpan<S>,
S: tower_service::Service<R>,
{
get_span: G,
_p: PhantomData<fn(S, R)>,
}
pub fn layer<S, R, G>(get_span: G) -> Layer<S, R, G>
where
G: GetSpan<S>,
S: tower_service::Service<R>,
{
Layer {
get_span,
_p: PhantomData,
}
}
// === impl Layer ===
impl<S, R, G> tower_layer::Layer<S> for Layer<S, R, G>
where
G: GetSpan<S>,
S: tower_service::Service<R>,
{
type Service = Service<S>;
fn layer(&self, inner: S) -> Self::Service {
let span = self.get_span.span_for(&inner);
Service { span, inner }
}
}
impl<S, R, G> Clone for Layer<S, R, G>
where
G: GetSpan<S> + Clone,
S: tower_service::Service<R>,
{
fn clone(&self) -> Self {
Self {
get_span: self.get_span.clone(),
_p: PhantomData,
}
}
}
}
#[cfg(feature = "tower-util")]
pub mod make {
use super::*;
#[derive(Debug)]
pub struct MakeService<M, T, R, G = fn(&T) -> tracing::Span>
where
G: GetSpan<T>,
{
get_span: G,
inner: M,
_p: PhantomData<fn(T, R)>,
}
#[derive(Debug)]
pub struct MakeFuture<F> {
inner: F,
span: Option<tracing::Span>,
}
#[derive(Debug)]
pub struct MakeLayer<T, R, G = fn(&T) -> tracing::Span>
where
G: GetSpan<T> + Clone,
{
get_span: G,
_p: PhantomData<fn(T, R)>,
}
#[cfg(feature = "tower-layer")]
pub fn layer<T, R, G>(get_span: G) -> MakeLayer<T, R, G>
where
G: GetSpan<T> + Clone,
{
MakeLayer {
get_span,
_p: PhantomData,
}
}
// === impl MakeLayer ===
#[cfg(feature = "tower-layer")]
impl<M, T, R, G> tower_layer::Layer<M> for MakeLayer<T, R, G>
where
M: tower_util::MakeService<T, R>,
G: GetSpan<T> + Clone,
{
type Service = MakeService<M, T, R, G>;
fn layer(&self, inner: M) -> Self::Service {
MakeService::new(inner, self.get_span.clone())
}
}
#[cfg(feature = "tower-layer")]
impl<T, R, G> Clone for MakeLayer<T, R, G>
where
G: GetSpan<T> + Clone,
{
fn clone(&self) -> Self {
Self {
get_span: self.get_span.clone(),
_p: PhantomData,
}
}
}
// === impl MakeService ===
impl<M, T, R, G> tower_service::Service<T> for MakeService<M, T, R, G>
where
M: tower_util::MakeService<T, R>,
G: GetSpan<T>,
{
type Response = Service<M::Service>;
type Error = M::MakeError;
type Future = MakeFuture<M::Future>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.inner.poll_ready()
}
fn call(&mut self, target: T) -> Self::Future {
let span = self.get_span.span_for(&target);
let inner = self.inner.make_service(target);
MakeFuture {
span: Some(span),
inner,
}
}
}
impl<F> Future for MakeFuture<F>
where
F: Future,
{
type Item = Service<F::Item>;
type Error = F::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let inner = {
let _guard = self.span.as_ref().map(tracing::Span::enter);
futures::try_ready!(self.inner.poll())
};
let span = self.span.take().expect("polled after ready");
Ok(Async::Ready(Service::new(inner, span)))
}
}
impl<M, T, R, G> MakeService<M, T, R, G>
where
M: tower_util::MakeService<T, R>,
G: GetSpan<T>,
{
pub fn new(inner: M, get_span: G) -> Self {
MakeService {
get_span,
inner,
_p: PhantomData,
}
}
}
}
// === impl Service ===
impl<S> Service<S> {
pub fn new(inner: S, span: tracing::Span) -> Self {
Self { span, inner }
}
}
impl<S, R> tower_service::Service<R> for Service<S>
where
S: tower_service::Service<R>,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
let _enter = self.span.enter();
self.inner.poll_ready()
}
fn call(&mut self, request: R) -> Self::Future {
let _enter = self.span.enter();
self.inner.call(request)
}
}
impl<S> Clone for Service<S>
where
S: Clone,
{
fn clone(&self) -> Self {
Service {
span: self.span.clone(),
inner: self.inner.clone(),
}
}
}