Eliza Weisman 2520f97964
fmt, subscriber: move fmt into subscriber (#311)
## Motivation

As discussed in #308, there are a large number of crates in this
repository, which can be confusing for users and can increase the
maintainance burden for maintainers. Also, the `tracing-fmt` and
`tracing-subscriber` crates both contain filtering implementations with
similar behaviour and APIs, but `tracing-subscriber`'s filter module
offers more advanced features (filtering on field values), and is usable
with any subscriber implementation. Two separate filter implementations
also has the potential to be confusing for users. 

## Solution

This branch moves most of the code from `tracing-fmt` into a module in
`tracing-subscriber`, and changes the `tracing-fmt` builder APIs to use
the `Filter` type in `tracing-subscriber`. The `tracing-subscriber/fmt`
feature flag can be used to disable the formatting subscriber when it is
not used.

The `tracing-fmt` crate has been updated to re-export the APIs from
`tracing-subscriber`, and marked as deprecated. Once we've published a
new version of `tracing-subscriber` with the format APIs, we can publish
a final release of `tracing-fmt` that will update the documentation &
mark all APIs as deprecated, so that users know to move to the
`tracing-subscriber` crate.

Refs: #308

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
2019-09-02 08:53:58 -07:00

181 lines
5.3 KiB
Rust

#![deny(rust_2018_idioms)]
use bytes::Bytes;
use futures::*;
use h2::Reason;
use http::{Request, Response};
use std::net::SocketAddr;
use string::{String, TryFrom};
use tokio::net::TcpStream;
use tokio::runtime::{Runtime, TaskExecutor};
use tower_h2::client::Connect;
use tower_h2::{Body, RecvBody};
use tower_service::Service;
use tower_util::MakeService;
use tracing_futures::Instrument;
use tracing_tower::InstrumentableService;
pub struct Conn(SocketAddr);
fn main() {
// Set the default subscriber to record all traces emitted by this example
// and by the `tracing_tower` library's helpers.
let subscriber = tracing_subscriber::FmtSubscriber::builder()
.with_filter("h2_client=trace,tracing_tower=trace")
.finish();
let _ = tracing::subscriber::set_global_default(subscriber);
let mut rt = Runtime::new().unwrap();
let executor = rt.executor();
let addr = "[::1]:8888".parse().unwrap();
impl Service<()> for Conn {
type Response = TcpStream;
type Error = ::std::io::Error;
type Future = Box<dyn Future<Item = TcpStream, Error = ::std::io::Error> + Send>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(().into())
}
fn call(&mut self, _: ()) -> Self::Future {
tracing::debug!("connecting...");
let c = TcpStream::connect(&self.0)
.and_then(|tcp| {
tcp.set_nodelay(true)?;
tracing::info!("connected!");
Ok(tcp)
})
.map_err(|error| {
tracing::error!(%error);
error
});
Box::new(c)
}
}
let conn = Conn(addr).trace_requests(tracing::debug_span!("connect", remote = %addr));
let mut h2 = Connect::new(conn, Default::default(), executor.clone());
let req_span: fn(&http::Request<_>) -> tracing::Span = |req| {
let span = tracing::trace_span!(
"request",
req.method = ?req.method(),
req.path = ?req.uri().path(),
);
{
// TODO: this is a workaround because tracing-fmt doesn't honor
// overridden request parents.
let _enter = span.enter();
tracing::trace!(parent: &span, "sending request...");
}
span
};
let done = h2
.make_service(())
.map_err(|_| Reason::REFUSED_STREAM.into())
.and_then(move |h2| {
let h2 = h2.trace_requests(req_span);
Serial {
h2,
count: 10,
pending: None,
}
})
.map(|_| println!("done"))
.map_err(|e| println!("error: {:?}", e));
rt.spawn(done);
rt.shutdown_on_idle().wait().unwrap();
}
/// Avoids overflowing max concurrent streams
struct Serial {
count: usize,
h2: tracing_tower::request_span::Service<
tower_h2::client::Connection<TcpStream, TaskExecutor, tower_h2::NoBody>,
http::Request<tower_h2::NoBody>,
>,
pending: Option<Box<dyn Future<Item = (), Error = tower_h2::client::Error> + Send>>,
}
impl Future for Serial {
type Item = ();
type Error = tower_h2::client::Error;
fn poll(&mut self) -> Poll<(), Self::Error> {
loop {
if let Some(mut fut) = self.pending.take() {
if fut.poll()?.is_not_ready() {
self.pending = Some(fut);
return Ok(Async::NotReady);
}
}
if self.count == 0 {
return Ok(Async::Ready(()));
}
self.count -= 1;
let mut fut = {
let span = tracing::debug_span!("serial", req.number = self.count);
let _enter = span.enter();
self.h2
.call(mkreq())
.and_then(move |rsp| read_response(rsp).map_err(Into::into))
.instrument(span.clone())
};
if fut.poll()?.is_not_ready() {
self.pending = Some(Box::new(fut));
return Ok(Async::NotReady);
}
}
}
}
fn mkreq() -> Request<tower_h2::NoBody> {
Request::builder()
.method("GET")
.uri("http://[::1]:8888/")
.version(http::Version::HTTP_2)
.body(tower_h2::NoBody)
.unwrap()
}
fn read_response(rsp: Response<RecvBody>) -> tracing_futures::Instrumented<ReadResponse> {
let span = tracing::trace_span!("response");
let f = {
let _enter = span.enter();
let (parts, body) = rsp.into_parts();
tracing::debug!(rsp.status = %parts.status);
ReadResponse { body }
};
f.instrument(span)
}
struct ReadResponse {
body: RecvBody,
}
impl Future for ReadResponse {
type Item = ();
type Error = tower_h2::client::Error;
fn poll(&mut self) -> Poll<(), Self::Error> {
loop {
match try_ready!(self.body.poll_data()) {
None => return Ok(Async::Ready(())),
Some(b) => {
let b: Bytes = b.into();
{
let s = String::try_from(b).expect("decode utf8 string");
tracing::trace!(rsp.body = &*s);
}
}
}
}
}
}