opentelemetry: update otel to 0.18.0 (#2303)

## Motivation

Support the latest OpenTelemetry specification.

## Solution

Update `opentelemetry` to the latest `0.18.x` release. Breaking changes
in the metrics spec have removed value recorders and added histograms so
the metrics layer's `value.` prefix has been changed to `histogram.` and
behaves accordingly. Additionally the `PushController` configuration for
the metrics layer has been simplified to accept a `BasicController` that
can act in either push or pull modes. Finally trace sampling in the
sdk's `PreSampledTracer` impl has been updated to match the sampling
logic in https://github.com/open-telemetry/opentelemetry-rust/pull/839.

* Update MSRV to 1.56
* Update examples
* Fix async-trait dep
* Update msrv action

Co-authored-by: Eliza Weisman <eliza@buoyant.io>
This commit is contained in:
Julian Tescher 2022-09-16 13:29:35 -07:00 committed by GitHub
parent 3a193f3d30
commit 10a4b13ddc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 285 additions and 260 deletions

View File

@ -214,6 +214,7 @@ jobs:
--exclude=tracing-appender
--exclude=tracing-examples
--exclude=tracing-futures
--exclude=tracing-opentelemetry
toolchain: ${{ env.MSRV }}
# TODO: remove this once tracing's MSRV is bumped.

View File

@ -52,8 +52,8 @@ inferno = "0.11.6"
tempfile = "3"
# opentelemetry example
opentelemetry = { version = "0.17.0", default-features = false, features = ["trace"] }
opentelemetry-jaeger = "0.16.0"
opentelemetry = { version = "0.18.0", default-features = false, features = ["trace"] }
opentelemetry-jaeger = "0.17.0"
# fmt examples
snafu = "0.6.10"

View File

@ -19,7 +19,7 @@ fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
// Install an otel pipeline with a simple span processor that exports data one at a time when
// spans end. See the `install_batch` option on each exporter's pipeline builder to see how to
// export in batches.
let tracer = opentelemetry_jaeger::new_pipeline()
let tracer = opentelemetry_jaeger::new_agent_pipeline()
.with_service_name("report_example")
.install_simple()?;
let opentelemetry = tracing_opentelemetry::layer().with_tracer(tracer);

View File

@ -17,7 +17,7 @@ categories = [
keywords = ["tracing", "opentelemetry", "jaeger", "zipkin", "async"]
license = "MIT"
edition = "2018"
rust-version = "1.46.0"
rust-version = "1.56.0"
[features]
default = ["tracing-log", "metrics"]
@ -25,7 +25,7 @@ default = ["tracing-log", "metrics"]
metrics = ["opentelemetry/metrics"]
[dependencies]
opentelemetry = { version = "0.17.0", default-features = false, features = ["trace"] }
opentelemetry = { version = "0.18.0", default-features = false, features = ["trace"] }
tracing = { path = "../tracing", version = "0.1.35", default-features = false, features = ["std"] }
tracing-core = { path = "../tracing-core", version = "0.1.28" }
tracing-subscriber = { path = "../tracing-subscriber", version = "0.3.0", default-features = false, features = ["registry", "std"] }
@ -39,7 +39,7 @@ thiserror = { version = "1.0.31", optional = true }
[dev-dependencies]
async-trait = "0.1.56"
criterion = { version = "0.3.6", default-features = false }
opentelemetry-jaeger = "0.16.0"
opentelemetry-jaeger = "0.17.0"
futures-util = { version = "0.3", default-features = false }
tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1"
@ -53,4 +53,4 @@ harness = false
[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]
rustdoc-args = ["--cfg", "docsrs"]

View File

@ -50,7 +50,7 @@ The crate provides the following types:
[`tracing`]: https://crates.io/crates/tracing
[OpenTelemetry]: https://opentelemetry.io/
*Compiler support: [requires `rustc` 1.49+][msrv]*
*Compiler support: [requires `rustc` 1.56+][msrv]*
[msrv]: #supported-rust-versions
@ -110,7 +110,7 @@ $ firefox http://localhost:16686/
## Supported Rust Versions
Tracing Opentelemetry is built against the latest stable release. The minimum
supported version is 1.46. The current Tracing version is not guaranteed to
supported version is 1.56. The current Tracing version is not guaranteed to
build on Rust versions earlier than the minimum supported version.
Tracing follows the same compiler support policies as the rest of the Tokio

View File

@ -1,11 +1,10 @@
use crate::{OtelData, PreSampledTracer};
use once_cell::unsync;
use opentelemetry::{
trace::{self as otel, noop, TraceContextExt},
Context as OtelContext, Key, KeyValue, Value,
trace::{self as otel, noop, OrderMap, TraceContextExt},
Context as OtelContext, Key, KeyValue, StringValue, Value,
};
use std::any::TypeId;
use std::borrow::Cow;
use std::fmt;
use std::marker;
use std::thread;
@ -105,12 +104,11 @@ fn str_to_span_kind(s: &str) -> Option<otel::SpanKind> {
}
}
fn str_to_status_code(s: &str) -> Option<otel::StatusCode> {
fn str_to_status(s: &str) -> otel::Status {
match s {
s if s.eq_ignore_ascii_case("unset") => Some(otel::StatusCode::Unset),
s if s.eq_ignore_ascii_case("ok") => Some(otel::StatusCode::Ok),
s if s.eq_ignore_ascii_case("error") => Some(otel::StatusCode::Error),
_ => None,
s if s.eq_ignore_ascii_case("ok") => otel::Status::Ok,
s if s.eq_ignore_ascii_case("error") => otel::Status::error(""),
_ => otel::Status::Unset,
}
}
@ -220,7 +218,7 @@ impl<'a, 'b> field::Visit for SpanEventVisitor<'a, 'b> {
let mut next_err = value.source();
while let Some(err) = next_err {
chain.push(Cow::Owned(err.to_string()));
chain.push(StringValue::from(err.to_string()));
next_err = err.source();
}
@ -245,7 +243,7 @@ impl<'a, 'b> field::Visit for SpanEventVisitor<'a, 'b> {
if self.exception_config.propagate {
if let Some(span) = &mut self.span_builder {
if let Some(attrs) = span.attributes.as_mut() {
attrs.push(Key::new(FIELD_EXCEPTION_MESSAGE).string(error_msg.clone()));
attrs.insert(Key::new(FIELD_EXCEPTION_MESSAGE), error_msg.clone().into());
// NOTE: This is actually not the stacktrace of the exception. This is
// the "source chain". It represents the heirarchy of errors from the
@ -253,7 +251,10 @@ impl<'a, 'b> field::Visit for SpanEventVisitor<'a, 'b> {
// of the callsites in the code that led to the error happening.
// `std::error::Error::backtrace` is a nightly-only API and cannot be
// used here until the feature is stabilized.
attrs.push(Key::new(FIELD_EXCEPTION_STACKTRACE).array(chain.clone()));
attrs.insert(
Key::new(FIELD_EXCEPTION_STACKTRACE),
Value::Array(chain.clone().into()),
);
}
}
}
@ -288,7 +289,7 @@ impl<'a> SpanAttributeVisitor<'a> {
fn record(&mut self, attribute: KeyValue) {
debug_assert!(self.span_builder.attributes.is_some());
if let Some(v) = self.span_builder.attributes.as_mut() {
v.push(attribute);
v.insert(attribute.key, attribute.value);
}
}
}
@ -322,9 +323,9 @@ impl<'a> field::Visit for SpanAttributeVisitor<'a> {
match field.name() {
SPAN_NAME_FIELD => self.span_builder.name = value.to_string().into(),
SPAN_KIND_FIELD => self.span_builder.span_kind = str_to_span_kind(value),
SPAN_STATUS_CODE_FIELD => self.span_builder.status_code = str_to_status_code(value),
SPAN_STATUS_CODE_FIELD => self.span_builder.status = str_to_status(value),
SPAN_STATUS_MESSAGE_FIELD => {
self.span_builder.status_message = Some(value.to_owned().into())
self.span_builder.status = otel::Status::error(value.to_string())
}
_ => self.record(KeyValue::new(field.name(), value.to_string())),
}
@ -341,10 +342,10 @@ impl<'a> field::Visit for SpanAttributeVisitor<'a> {
self.span_builder.span_kind = str_to_span_kind(&format!("{:?}", value))
}
SPAN_STATUS_CODE_FIELD => {
self.span_builder.status_code = str_to_status_code(&format!("{:?}", value))
self.span_builder.status = str_to_status(&format!("{:?}", value))
}
SPAN_STATUS_MESSAGE_FIELD => {
self.span_builder.status_message = Some(format!("{:?}", value).into())
self.span_builder.status = otel::Status::error(format!("{:?}", value))
}
_ => self.record(Key::new(field.name()).string(format!("{:?}", value))),
}
@ -363,7 +364,7 @@ impl<'a> field::Visit for SpanAttributeVisitor<'a> {
let mut next_err = value.source();
while let Some(err) = next_err {
chain.push(Cow::Owned(err.to_string()));
chain.push(StringValue::from(err.to_string()));
next_err = err.source();
}
@ -405,7 +406,7 @@ where
/// use tracing_subscriber::Registry;
///
/// // Create a jaeger exporter pipeline for a `trace_demo` service.
/// let tracer = opentelemetry_jaeger::new_pipeline()
/// let tracer = opentelemetry_jaeger::new_agent_pipeline()
/// .with_service_name("trace_demo")
/// .install_simple()
/// .expect("Error initializing Jaeger exporter");
@ -446,7 +447,7 @@ where
/// use tracing_subscriber::Registry;
///
/// // Create a jaeger exporter pipeline for a `trace_demo` service.
/// let tracer = opentelemetry_jaeger::new_pipeline()
/// let tracer = opentelemetry_jaeger::new_agent_pipeline()
/// .with_service_name("trace_demo")
/// .install_simple()
/// .expect("Error initializing Jaeger exporter");
@ -684,7 +685,7 @@ where
builder.trace_id = Some(self.tracer.new_trace_id());
}
let builder_attrs = builder.attributes.get_or_insert(Vec::with_capacity(
let builder_attrs = builder.attributes.get_or_insert(OrderMap::with_capacity(
attrs.fields().len() + self.extra_span_attrs(),
));
@ -692,26 +693,26 @@ where
let meta = attrs.metadata();
if let Some(filename) = meta.file() {
builder_attrs.push(KeyValue::new("code.filepath", filename));
builder_attrs.insert("code.filepath".into(), filename.into());
}
if let Some(module) = meta.module_path() {
builder_attrs.push(KeyValue::new("code.namespace", module));
builder_attrs.insert("code.namespace".into(), module.into());
}
if let Some(line) = meta.line() {
builder_attrs.push(KeyValue::new("code.lineno", line as i64));
builder_attrs.insert("code.lineno".into(), (line as i64).into());
}
}
if self.with_threads {
THREAD_ID.with(|id| builder_attrs.push(KeyValue::new("thread.id", **id as i64)));
THREAD_ID.with(|id| builder_attrs.insert("thread.id".into(), (**id as i64).into()));
if let Some(name) = std::thread::current().name() {
// TODO(eliza): it's a bummer that we have to allocate here, but
// we can't easily get the string as a `static`. it would be
// nice if `opentelemetry` could also take `Arc<str>`s as
// `String` values...
builder_attrs.push(KeyValue::new("thread.name", name.to_owned()));
builder_attrs.insert("thread.name".into(), name.to_owned().into());
}
}
@ -845,8 +846,10 @@ where
});
if let Some(OtelData { builder, .. }) = extensions.get_mut::<OtelData>() {
if builder.status_code.is_none() && *meta.level() == tracing_core::Level::ERROR {
builder.status_code = Some(otel::StatusCode::Error);
if builder.status == otel::Status::Unset
&& *meta.level() == tracing_core::Level::ERROR
{
builder.status = otel::Status::error("")
}
if self.location {
@ -904,15 +907,14 @@ where
if self.tracked_inactivity {
// Append busy/idle timings when enabled.
if let Some(timings) = extensions.get_mut::<Timings>() {
let busy_ns = KeyValue::new("busy_ns", timings.busy);
let idle_ns = KeyValue::new("idle_ns", timings.idle);
let busy_ns = Key::new("busy_ns");
let idle_ns = Key::new("idle_ns");
if let Some(ref mut attributes) = builder.attributes {
attributes.push(busy_ns);
attributes.push(idle_ns);
} else {
builder.attributes = Some(vec![busy_ns, idle_ns]);
}
let attributes = builder
.attributes
.get_or_insert_with(|| OrderMap::with_capacity(2));
attributes.insert(busy_ns, timings.busy.into());
attributes.insert(idle_ns, timings.idle.into());
}
}
@ -965,7 +967,10 @@ fn thread_id_integer(id: thread::ThreadId) -> u64 {
mod tests {
use super::*;
use crate::OtelData;
use opentelemetry::trace::{noop, SpanKind, TraceFlags};
use opentelemetry::{
trace::{noop, TraceFlags},
StringValue,
};
use std::{
borrow::Cow,
collections::HashMap,
@ -1043,7 +1048,7 @@ mod tests {
false
}
fn set_attribute(&mut self, _attribute: KeyValue) {}
fn set_status(&mut self, _code: otel::StatusCode, _message: String) {}
fn set_status(&mut self, _status: otel::Status) {}
fn update_name<T: Into<Cow<'static, str>>>(&mut self, _new_name: T) {}
fn end_with_timestamp(&mut self, _timestamp: SystemTime) {}
}
@ -1103,7 +1108,7 @@ mod tests {
let subscriber = tracing_subscriber::registry().with(layer().with_tracer(tracer.clone()));
tracing::subscriber::with_default(subscriber, || {
tracing::debug_span!("request", otel.kind = %SpanKind::Server);
tracing::debug_span!("request", otel.kind = "server");
});
let recorded_kind = tracer.with_data(|data| data.builder.span_kind.clone());
@ -1116,11 +1121,19 @@ mod tests {
let subscriber = tracing_subscriber::registry().with(layer().with_tracer(tracer.clone()));
tracing::subscriber::with_default(subscriber, || {
tracing::debug_span!("request", otel.status_code = ?otel::StatusCode::Ok);
tracing::debug_span!("request", otel.status_code = ?otel::Status::Ok);
});
let recorded_status = tracer
.0
.lock()
.unwrap()
.as_ref()
.unwrap()
.builder
.status
.clone();
let recorded_status_code = tracer.with_data(|data| data.builder.status_code);
assert_eq!(recorded_status_code, Some(otel::StatusCode::Ok))
assert_eq!(recorded_status, otel::Status::Ok)
}
#[test]
@ -1134,8 +1147,17 @@ mod tests {
tracing::debug_span!("request", otel.status_message = message);
});
let recorded_status_message = tracer.with_data(|data| data.builder.status_message.clone());
assert_eq!(recorded_status_message, Some(message.into()))
let recorded_status_message = tracer
.0
.lock()
.unwrap()
.as_ref()
.unwrap()
.builder
.status
.clone();
assert_eq!(recorded_status_message, otel::Status::error(message))
}
#[test]
@ -1153,7 +1175,7 @@ mod tests {
let _g = existing_cx.attach();
tracing::subscriber::with_default(subscriber, || {
tracing::debug_span!("request", otel.kind = %SpanKind::Server);
tracing::debug_span!("request", otel.kind = "server");
});
let recorded_trace_id =
@ -1177,7 +1199,7 @@ mod tests {
let attributes = tracer.with_data(|data| data.builder.attributes.as_ref().unwrap().clone());
let keys = attributes
.iter()
.map(|attr| attr.key.as_str())
.map(|(key, _)| key.as_str())
.collect::<Vec<&str>>();
assert!(keys.contains(&"idle_ns"));
assert!(keys.contains(&"busy_ns"));
@ -1217,7 +1239,7 @@ mod tests {
let key_values = attributes
.into_iter()
.map(|attr| (attr.key.as_str().to_owned(), attr.value))
.map(|(key, value)| (key.as_str().to_owned(), value))
.collect::<HashMap<_, _>>();
assert_eq!(key_values["error"].as_str(), "user error");
@ -1225,8 +1247,8 @@ mod tests {
key_values["error.chain"],
Value::Array(
vec![
Cow::Borrowed("intermediate error"),
Cow::Borrowed("base error")
StringValue::from("intermediate error"),
StringValue::from("base error")
]
.into()
)
@ -1237,8 +1259,8 @@ mod tests {
key_values[FIELD_EXCEPTION_STACKTRACE],
Value::Array(
vec![
Cow::Borrowed("intermediate error"),
Cow::Borrowed("base error")
StringValue::from("intermediate error"),
StringValue::from("base error")
]
.into()
)
@ -1258,7 +1280,7 @@ mod tests {
let attributes = tracer.with_data(|data| data.builder.attributes.as_ref().unwrap().clone());
let keys = attributes
.iter()
.map(|attr| attr.key.as_str())
.map(|(key, _)| key.as_str())
.collect::<Vec<&str>>();
assert!(keys.contains(&"code.filepath"));
assert!(keys.contains(&"code.namespace"));
@ -1278,7 +1300,7 @@ mod tests {
let attributes = tracer.with_data(|data| data.builder.attributes.as_ref().unwrap().clone());
let keys = attributes
.iter()
.map(|attr| attr.key.as_str())
.map(|(key, _)| key.as_str())
.collect::<Vec<&str>>();
assert!(!keys.contains(&"code.filepath"));
assert!(!keys.contains(&"code.namespace"));
@ -1290,7 +1312,7 @@ mod tests {
let thread = thread::current();
let expected_name = thread
.name()
.map(|name| Value::String(Cow::Owned(name.to_owned())));
.map(|name| Value::String(name.to_owned().into()));
let expected_id = Value::I64(thread_id_integer(thread.id()) as i64);
let tracer = TestTracer(Arc::new(Mutex::new(None)));
@ -1304,7 +1326,7 @@ mod tests {
let attributes = tracer
.with_data(|data| data.builder.attributes.as_ref().unwrap().clone())
.drain(..)
.map(|keyval| (keyval.key.as_str().to_string(), keyval.value))
.map(|(key, value)| (key.as_str().to_string(), value))
.collect::<HashMap<_, _>>();
assert_eq!(attributes.get("thread.name"), expected_name.as_ref());
assert_eq!(attributes.get("thread.id"), Some(&expected_id));
@ -1323,7 +1345,7 @@ mod tests {
let attributes = tracer.with_data(|data| data.builder.attributes.as_ref().unwrap().clone());
let keys = attributes
.iter()
.map(|attr| attr.key.as_str())
.map(|(key, _)| key.as_str())
.collect::<Vec<&str>>();
assert!(!keys.contains(&"thread.name"));
assert!(!keys.contains(&"thread.id"));
@ -1365,7 +1387,7 @@ mod tests {
let key_values = attributes
.into_iter()
.map(|attr| (attr.key.as_str().to_owned(), attr.value))
.map(|(key, value)| (key.as_str().to_owned(), value))
.collect::<HashMap<_, _>>();
assert_eq!(key_values[FIELD_EXCEPTION_MESSAGE].as_str(), "user error");
@ -1373,8 +1395,8 @@ mod tests {
key_values[FIELD_EXCEPTION_STACKTRACE],
Value::Array(
vec![
Cow::Borrowed("intermediate error"),
Cow::Borrowed("base error")
StringValue::from("intermediate error"),
StringValue::from("base error")
]
.into()
)

View File

@ -9,7 +9,7 @@
//! [OpenTelemetry]: https://opentelemetry.io
//! [`tracing`]: https://github.com/tokio-rs/tracing
//!
//! *Compiler support: [requires `rustc` 1.49+][msrv]*
//! *Compiler support: [requires `rustc` 1.56+][msrv]*
//!
//! [msrv]: #supported-rust-versions
//!
@ -86,7 +86,7 @@
//! ## Supported Rust Versions
//!
//! Tracing is built against the latest stable release. The minimum supported
//! version is 1.49. The current Tracing version is not guaranteed to build on
//! version is 1.56. The current Tracing version is not guaranteed to build on
//! Rust versions earlier than the minimum supported version.
//!
//! Tracing follows the same compiler support policies as the rest of the Tokio

View File

@ -3,8 +3,9 @@ use tracing::{field::Visit, Subscriber};
use tracing_core::Field;
use opentelemetry::{
metrics::{Counter, Meter, MeterProvider, UpDownCounter, ValueRecorder},
sdk::metrics::PushController,
metrics::{Counter, Histogram, Meter, MeterProvider, UpDownCounter},
sdk::metrics::controllers::BasicController,
Context as OtelContext,
};
use tracing_subscriber::{layer::Context, registry::LookupSpan, Layer};
@ -13,7 +14,7 @@ const INSTRUMENTATION_LIBRARY_NAME: &str = "tracing/tracing-opentelemetry";
const METRIC_PREFIX_MONOTONIC_COUNTER: &str = "monotonic_counter.";
const METRIC_PREFIX_COUNTER: &str = "counter.";
const METRIC_PREFIX_VALUE: &str = "value.";
const METRIC_PREFIX_HISTOGRAM: &str = "histogram.";
const I64_MAX: u64 = i64::MAX as u64;
#[derive(Default)]
@ -22,9 +23,9 @@ pub(crate) struct Instruments {
f64_counter: MetricsMap<Counter<f64>>,
i64_up_down_counter: MetricsMap<UpDownCounter<i64>>,
f64_up_down_counter: MetricsMap<UpDownCounter<f64>>,
u64_value_recorder: MetricsMap<ValueRecorder<u64>>,
i64_value_recorder: MetricsMap<ValueRecorder<i64>>,
f64_value_recorder: MetricsMap<ValueRecorder<f64>>,
u64_histogram: MetricsMap<Histogram<u64>>,
i64_histogram: MetricsMap<Histogram<i64>>,
f64_histogram: MetricsMap<Histogram<f64>>,
}
type MetricsMap<T> = RwLock<HashMap<&'static str, T>>;
@ -35,14 +36,15 @@ pub(crate) enum InstrumentType {
CounterF64(f64),
UpDownCounterI64(i64),
UpDownCounterF64(f64),
ValueRecorderU64(u64),
ValueRecorderI64(i64),
ValueRecorderF64(f64),
HistogramU64(u64),
HistogramI64(i64),
HistogramF64(f64),
}
impl Instruments {
pub(crate) fn update_metric(
&self,
cx: &OtelContext,
meter: &Meter,
instrument_type: InstrumentType,
metric_name: &'static str,
@ -76,7 +78,7 @@ impl Instruments {
&self.u64_counter,
metric_name,
|| meter.u64_counter(metric_name).init(),
|ctr| ctr.add(value, &[]),
|ctr| ctr.add(cx, value, &[]),
);
}
InstrumentType::CounterF64(value) => {
@ -84,7 +86,7 @@ impl Instruments {
&self.f64_counter,
metric_name,
|| meter.f64_counter(metric_name).init(),
|ctr| ctr.add(value, &[]),
|ctr| ctr.add(cx, value, &[]),
);
}
InstrumentType::UpDownCounterI64(value) => {
@ -92,7 +94,7 @@ impl Instruments {
&self.i64_up_down_counter,
metric_name,
|| meter.i64_up_down_counter(metric_name).init(),
|ctr| ctr.add(value, &[]),
|ctr| ctr.add(cx, value, &[]),
);
}
InstrumentType::UpDownCounterF64(value) => {
@ -100,31 +102,31 @@ impl Instruments {
&self.f64_up_down_counter,
metric_name,
|| meter.f64_up_down_counter(metric_name).init(),
|ctr| ctr.add(value, &[]),
|ctr| ctr.add(cx, value, &[]),
);
}
InstrumentType::ValueRecorderU64(value) => {
InstrumentType::HistogramU64(value) => {
update_or_insert(
&self.u64_value_recorder,
&self.u64_histogram,
metric_name,
|| meter.u64_value_recorder(metric_name).init(),
|rec| rec.record(value, &[]),
|| meter.u64_histogram(metric_name).init(),
|rec| rec.record(cx, value, &[]),
);
}
InstrumentType::ValueRecorderI64(value) => {
InstrumentType::HistogramI64(value) => {
update_or_insert(
&self.i64_value_recorder,
&self.i64_histogram,
metric_name,
|| meter.i64_value_recorder(metric_name).init(),
|rec| rec.record(value, &[]),
|| meter.i64_histogram(metric_name).init(),
|rec| rec.record(cx, value, &[]),
);
}
InstrumentType::ValueRecorderF64(value) => {
InstrumentType::HistogramF64(value) => {
update_or_insert(
&self.f64_value_recorder,
&self.f64_histogram,
metric_name,
|| meter.f64_value_recorder(metric_name).init(),
|rec| rec.record(value, &[]),
|| meter.f64_histogram(metric_name).init(),
|rec| rec.record(cx, value, &[]),
);
}
};
@ -142,8 +144,10 @@ impl<'a> Visit for MetricVisitor<'a> {
}
fn record_u64(&mut self, field: &Field, value: u64) {
let cx = OtelContext::current();
if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_MONOTONIC_COUNTER) {
self.instruments.update_metric(
&cx,
self.meter,
InstrumentType::CounterU64(value),
metric_name,
@ -151,6 +155,7 @@ impl<'a> Visit for MetricVisitor<'a> {
} else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_COUNTER) {
if value <= I64_MAX {
self.instruments.update_metric(
&cx,
self.meter,
InstrumentType::UpDownCounterI64(value as i64),
metric_name,
@ -163,54 +168,63 @@ impl<'a> Visit for MetricVisitor<'a> {
value
);
}
} else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_VALUE) {
} else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_HISTOGRAM) {
self.instruments.update_metric(
&cx,
self.meter,
InstrumentType::ValueRecorderU64(value),
InstrumentType::HistogramU64(value),
metric_name,
);
}
}
fn record_f64(&mut self, field: &Field, value: f64) {
let cx = OtelContext::current();
if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_MONOTONIC_COUNTER) {
self.instruments.update_metric(
&cx,
self.meter,
InstrumentType::CounterF64(value),
metric_name,
);
} else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_COUNTER) {
self.instruments.update_metric(
&cx,
self.meter,
InstrumentType::UpDownCounterF64(value),
metric_name,
);
} else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_VALUE) {
} else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_HISTOGRAM) {
self.instruments.update_metric(
&cx,
self.meter,
InstrumentType::ValueRecorderF64(value),
InstrumentType::HistogramF64(value),
metric_name,
);
}
}
fn record_i64(&mut self, field: &Field, value: i64) {
let cx = OtelContext::current();
if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_MONOTONIC_COUNTER) {
self.instruments.update_metric(
&cx,
self.meter,
InstrumentType::CounterU64(value as u64),
metric_name,
);
} else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_COUNTER) {
self.instruments.update_metric(
&cx,
self.meter,
InstrumentType::UpDownCounterI64(value),
metric_name,
);
} else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_VALUE) {
} else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_HISTOGRAM) {
self.instruments.update_metric(
&cx,
self.meter,
InstrumentType::ValueRecorderI64(value),
InstrumentType::HistogramI64(value),
metric_name,
);
}
@ -232,14 +246,14 @@ impl<'a> Visit for MetricVisitor<'a> {
/// use tracing_opentelemetry::MetricsLayer;
/// use tracing_subscriber::layer::SubscriberExt;
/// use tracing_subscriber::Registry;
/// # use opentelemetry::sdk::metrics::PushController;
/// # use opentelemetry::sdk::metrics::controllers::BasicController;
///
/// // Constructing a PushController is out-of-scope for the docs here, but there
/// // Constructing a BasicController is out-of-scope for the docs here, but there
/// // are examples in the opentelemetry repository. See:
/// // https://github.com/open-telemetry/opentelemetry-rust/blob/c13a11e62a68eacd8c41a0742a0d097808e28fbd/examples/basic-otlp/src/main.rs#L39-L53
/// # let push_controller: PushController = unimplemented!();
/// // https://github.com/open-telemetry/opentelemetry-rust/blob/d4b9befea04bcc7fc19319a6ebf5b5070131c486/examples/basic-otlp/src/main.rs#L35-L52
/// # let controller: BasicController = unimplemented!();
///
/// let opentelemetry_metrics = MetricsLayer::new(push_controller);
/// let opentelemetry_metrics = MetricsLayer::new(controller);
/// let subscriber = Registry::default().with(opentelemetry_metrics);
/// tracing::subscriber::set_global_default(subscriber).unwrap();
/// ```
@ -329,10 +343,9 @@ pub struct MetricsLayer {
impl MetricsLayer {
/// Create a new instance of MetricsLayer.
pub fn new(push_controller: PushController) -> Self {
let meter = push_controller
.provider()
.meter(INSTRUMENTATION_LIBRARY_NAME, Some(CARGO_PKG_VERSION));
pub fn new(controller: BasicController) -> Self {
let meter =
controller.versioned_meter(INSTRUMENTATION_LIBRARY_NAME, Some(CARGO_PKG_VERSION), None);
MetricsLayer {
meter,
instruments: Default::default(),

View File

@ -1,9 +1,10 @@
use opentelemetry::sdk::trace::{SamplingDecision, SamplingResult, Tracer, TracerProvider};
use opentelemetry::sdk::trace::{Tracer, TracerProvider};
use opentelemetry::trace::OrderMap;
use opentelemetry::{
trace as otel,
trace::{
noop, SpanBuilder, SpanContext, SpanId, SpanKind, TraceContextExt, TraceFlags, TraceId,
TraceState,
noop, SamplingDecision, SamplingResult, SpanBuilder, SpanContext, SpanId, SpanKind,
TraceContextExt, TraceFlags, TraceId, TraceState,
},
Context as OtelContext,
};
@ -74,19 +75,18 @@ impl PreSampledTracer for Tracer {
let builder = &mut data.builder;
// Gather trace state
let (no_parent, trace_id, remote_parent, parent_trace_flags) =
current_trace_state(builder, parent_cx, &provider);
let (trace_id, parent_trace_flags) = current_trace_state(builder, parent_cx, &provider);
// Sample or defer to existing sampling decisions
let (flags, trace_state) = if let Some(result) = &builder.sampling_result {
process_sampling_result(result, parent_trace_flags)
} else if no_parent || remote_parent {
} else {
builder.sampling_result = Some(provider.config().sampler.should_sample(
Some(parent_cx),
trace_id,
&builder.name,
builder.span_kind.as_ref().unwrap_or(&SpanKind::Internal),
builder.attributes.as_deref().unwrap_or(&[]),
builder.attributes.as_ref().unwrap_or(&OrderMap::default()),
builder.links.as_deref().unwrap_or(&[]),
self.instrumentation_library(),
));
@ -95,12 +95,6 @@ impl PreSampledTracer for Tracer {
builder.sampling_result.as_ref().unwrap(),
parent_trace_flags,
)
} else {
// has parent that is local
Some((
parent_trace_flags,
parent_cx.span().span_context().trace_state().clone(),
))
}
.unwrap_or_default();
@ -126,18 +120,16 @@ fn current_trace_state(
builder: &SpanBuilder,
parent_cx: &OtelContext,
provider: &TracerProvider,
) -> (bool, TraceId, bool, TraceFlags) {
) -> (TraceId, TraceFlags) {
if parent_cx.has_active_span() {
let span = parent_cx.span();
let sc = span.span_context();
(false, sc.trace_id(), sc.is_remote(), sc.trace_flags())
(sc.trace_id(), sc.trace_flags())
} else {
(
true,
builder
.trace_id
.unwrap_or_else(|| provider.config().id_generator.new_trace_id()),
false,
Default::default(),
)
}

View File

@ -1,26 +1,21 @@
#![cfg(feature = "metrics")]
use async_trait::async_trait;
use futures_util::{Stream, StreamExt as _};
use opentelemetry::{
metrics::{Descriptor, InstrumentKind},
metrics::{Number, NumberKind},
metrics::MetricsError,
sdk::{
export::{
metrics::{
CheckpointSet, ExportKind, ExportKindFor, ExportKindSelector,
Exporter as MetricsExporter, Points, Sum,
},
trace::{SpanData, SpanExporter},
export::metrics::{
aggregation::{self, Histogram, Sum, TemporalitySelector},
InstrumentationLibraryReader,
},
metrics::{
aggregators::{ArrayAggregator, SumAggregator},
selectors::simple::Selector,
aggregators::{HistogramAggregator, SumAggregator},
controllers::BasicController,
processors,
sdk_api::{Descriptor, InstrumentKind, Number, NumberKind},
selectors,
},
},
Key, Value,
Context,
};
use std::cmp::Ordering;
use std::time::Duration;
use tracing::Subscriber;
use tracing_opentelemetry::MetricsLayer;
use tracing_subscriber::prelude::*;
@ -30,7 +25,7 @@ const INSTRUMENTATION_LIBRARY_NAME: &str = "tracing/tracing-opentelemetry";
#[tokio::test]
async fn u64_counter_is_exported() {
let subscriber = init_subscriber(
let (subscriber, exporter) = init_subscriber(
"hello_world".to_string(),
InstrumentKind::Counter,
NumberKind::U64,
@ -40,11 +35,13 @@ async fn u64_counter_is_exported() {
tracing::subscriber::with_default(subscriber, || {
tracing::info!(monotonic_counter.hello_world = 1_u64);
});
exporter.export().unwrap();
}
#[tokio::test]
async fn u64_counter_is_exported_i64_at_instrumentation_point() {
let subscriber = init_subscriber(
let (subscriber, exporter) = init_subscriber(
"hello_world2".to_string(),
InstrumentKind::Counter,
NumberKind::U64,
@ -54,11 +51,13 @@ async fn u64_counter_is_exported_i64_at_instrumentation_point() {
tracing::subscriber::with_default(subscriber, || {
tracing::info!(monotonic_counter.hello_world2 = 1_i64);
});
exporter.export().unwrap();
}
#[tokio::test]
async fn f64_counter_is_exported() {
let subscriber = init_subscriber(
let (subscriber, exporter) = init_subscriber(
"float_hello_world".to_string(),
InstrumentKind::Counter,
NumberKind::F64,
@ -68,11 +67,13 @@ async fn f64_counter_is_exported() {
tracing::subscriber::with_default(subscriber, || {
tracing::info!(monotonic_counter.float_hello_world = 1.000000123_f64);
});
exporter.export().unwrap();
}
#[tokio::test]
async fn i64_up_down_counter_is_exported() {
let subscriber = init_subscriber(
let (subscriber, exporter) = init_subscriber(
"pebcak".to_string(),
InstrumentKind::UpDownCounter,
NumberKind::I64,
@ -82,11 +83,13 @@ async fn i64_up_down_counter_is_exported() {
tracing::subscriber::with_default(subscriber, || {
tracing::info!(counter.pebcak = -5_i64);
});
exporter.export().unwrap();
}
#[tokio::test]
async fn i64_up_down_counter_is_exported_u64_at_instrumentation_point() {
let subscriber = init_subscriber(
let (subscriber, exporter) = init_subscriber(
"pebcak2".to_string(),
InstrumentKind::UpDownCounter,
NumberKind::I64,
@ -96,11 +99,13 @@ async fn i64_up_down_counter_is_exported_u64_at_instrumentation_point() {
tracing::subscriber::with_default(subscriber, || {
tracing::info!(counter.pebcak2 = 5_u64);
});
exporter.export().unwrap();
}
#[tokio::test]
async fn f64_up_down_counter_is_exported() {
let subscriber = init_subscriber(
let (subscriber, exporter) = init_subscriber(
"pebcak_blah".to_string(),
InstrumentKind::UpDownCounter,
NumberKind::F64,
@ -110,13 +115,15 @@ async fn f64_up_down_counter_is_exported() {
tracing::subscriber::with_default(subscriber, || {
tracing::info!(counter.pebcak_blah = 99.123_f64);
});
exporter.export().unwrap();
}
#[tokio::test]
async fn u64_value_is_exported() {
let subscriber = init_subscriber(
async fn u64_histogram_is_exported() {
let (subscriber, exporter) = init_subscriber(
"abcdefg".to_string(),
InstrumentKind::ValueRecorder,
InstrumentKind::Histogram,
NumberKind::U64,
Number::from(9_u64),
);
@ -124,13 +131,15 @@ async fn u64_value_is_exported() {
tracing::subscriber::with_default(subscriber, || {
tracing::info!(value.abcdefg = 9_u64);
});
exporter.export().unwrap();
}
#[tokio::test]
async fn i64_value_is_exported() {
let subscriber = init_subscriber(
async fn i64_histogram_is_exported() {
let (subscriber, exporter) = init_subscriber(
"abcdefg_auenatsou".to_string(),
InstrumentKind::ValueRecorder,
InstrumentKind::Histogram,
NumberKind::I64,
Number::from(-19_i64),
);
@ -138,13 +147,15 @@ async fn i64_value_is_exported() {
tracing::subscriber::with_default(subscriber, || {
tracing::info!(value.abcdefg_auenatsou = -19_i64);
});
exporter.export().unwrap();
}
#[tokio::test]
async fn f64_value_is_exported() {
let subscriber = init_subscriber(
async fn f64_histogram_is_exported() {
let (subscriber, exporter) = init_subscriber(
"abcdefg_racecar".to_string(),
InstrumentKind::ValueRecorder,
InstrumentKind::Histogram,
NumberKind::F64,
Number::from(777.0012_f64),
);
@ -152,6 +163,8 @@ async fn f64_value_is_exported() {
tracing::subscriber::with_default(subscriber, || {
tracing::info!(value.abcdefg_racecar = 777.0012_f64);
});
exporter.export().unwrap();
}
fn init_subscriber(
@ -159,24 +172,25 @@ fn init_subscriber(
expected_instrument_kind: InstrumentKind,
expected_number_kind: NumberKind,
expected_value: Number,
) -> impl Subscriber + 'static {
) -> (impl Subscriber + 'static, TestExporter) {
let controller = opentelemetry::sdk::metrics::controllers::basic(processors::factory(
selectors::simple::histogram(vec![-10.0, 100.0]),
aggregation::cumulative_temporality_selector(),
))
.build();
let exporter = TestExporter {
expected_metric_name,
expected_instrument_kind,
expected_number_kind,
expected_value,
controller: controller.clone(),
};
let push_controller = opentelemetry::sdk::metrics::controllers::push(
Selector::Exact,
ExportKindSelector::Stateless,
(
tracing_subscriber::registry().with(MetricsLayer::new(controller)),
exporter,
tokio::spawn,
delayed_interval,
)
.build();
tracing_subscriber::registry().with(MetricsLayer::new(push_controller))
}
#[derive(Clone, Debug)]
@ -185,100 +199,84 @@ struct TestExporter {
expected_instrument_kind: InstrumentKind,
expected_number_kind: NumberKind,
expected_value: Number,
controller: BasicController,
}
#[async_trait]
impl SpanExporter for TestExporter {
async fn export(
&mut self,
mut _batch: Vec<SpanData>,
) -> opentelemetry::sdk::export::trace::ExportResult {
Ok(())
}
}
impl TestExporter {
fn export(&self) -> Result<(), MetricsError> {
self.controller.collect(&Context::current())?;
self.controller.try_for_each(&mut |library, reader| {
reader.try_for_each(self, &mut |record| {
assert_eq!(self.expected_metric_name, record.descriptor().name());
assert_eq!(
self.expected_instrument_kind,
*record.descriptor().instrument_kind()
);
assert_eq!(
self.expected_number_kind,
*record.descriptor().number_kind()
);
match self.expected_instrument_kind {
InstrumentKind::Counter | InstrumentKind::UpDownCounter => {
let number = record
.aggregator()
.unwrap()
.as_any()
.downcast_ref::<SumAggregator>()
.unwrap()
.sum()
.unwrap();
impl MetricsExporter for TestExporter {
fn export(&self, checkpoint_set: &mut dyn CheckpointSet) -> opentelemetry::metrics::Result<()> {
checkpoint_set.try_for_each(self, &mut |record| {
assert_eq!(self.expected_metric_name, record.descriptor().name());
assert_eq!(
self.expected_instrument_kind,
*record.descriptor().instrument_kind()
);
assert_eq!(
self.expected_number_kind,
*record.descriptor().number_kind()
);
let number = match self.expected_instrument_kind {
InstrumentKind::Counter | InstrumentKind::UpDownCounter => record
.aggregator()
.unwrap()
.as_any()
.downcast_ref::<SumAggregator>()
.unwrap()
.sum()
.unwrap(),
InstrumentKind::ValueRecorder => record
.aggregator()
.unwrap()
.as_any()
.downcast_ref::<ArrayAggregator>()
.unwrap()
.points()
.unwrap()[0]
.clone(),
_ => panic!(
"InstrumentKind {:?} not currently supported!",
self.expected_instrument_kind
),
};
assert_eq!(
Ordering::Equal,
number
.partial_cmp(&NumberKind::U64, &self.expected_value)
.unwrap()
);
assert_eq!(
Ordering::Equal,
number
.partial_cmp(&NumberKind::U64, &self.expected_value)
.unwrap()
);
}
InstrumentKind::Histogram => {
let histogram = record
.aggregator()
.unwrap()
.as_any()
.downcast_ref::<HistogramAggregator>()
.unwrap()
.histogram()
.unwrap();
// The following are the same regardless of the individual metric.
assert_eq!(
INSTRUMENTATION_LIBRARY_NAME,
record.descriptor().instrumentation_library().name
);
assert_eq!(
CARGO_PKG_VERSION,
record.descriptor().instrumentation_version().unwrap()
);
assert_eq!(
Value::String("unknown_service".into()),
record
.resource()
.get(Key::new("service.name".to_string()))
.unwrap()
);
let counts = histogram.counts();
if dbg!(self.expected_value.to_i64(&self.expected_number_kind)) > 100 {
assert_eq!(counts, &[0.0, 0.0, 1.0]);
} else if self.expected_value.to_i64(&self.expected_number_kind) > 0 {
assert_eq!(counts, &[0.0, 1.0, 0.0]);
} else {
assert_eq!(counts, &[1.0, 0.0, 0.0]);
}
}
_ => panic!(
"InstrumentKind {:?} not currently supported!",
self.expected_instrument_kind
),
};
opentelemetry::metrics::Result::Ok(())
// The following are the same regardless of the individual metric.
assert_eq!(INSTRUMENTATION_LIBRARY_NAME, library.name);
assert_eq!(CARGO_PKG_VERSION, library.version.as_ref().unwrap());
Ok(())
})
})
}
}
impl ExportKindFor for TestExporter {
fn export_kind_for(&self, _descriptor: &Descriptor) -> ExportKind {
impl TemporalitySelector for TestExporter {
fn temporality_for(
&self,
_descriptor: &Descriptor,
_kind: &aggregation::AggregationKind,
) -> aggregation::Temporality {
// I don't think the value here makes a difference since
// we are just testing a single metric.
ExportKind::Cumulative
aggregation::Temporality::Cumulative
}
}
// From opentelemetry::sdk::util::
// For some reason I can't pull it in from the other crate, it gives
// could not find `util` in `sdk`
/// Helper which wraps `tokio::time::interval` and makes it return a stream
fn tokio_interval_stream(period: std::time::Duration) -> tokio_stream::wrappers::IntervalStream {
tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(period))
}
// https://github.com/open-telemetry/opentelemetry-rust/blob/2585d109bf90d53d57c91e19c758dca8c36f5512/examples/basic-otlp/src/main.rs#L34-L37
// Skip first immediate tick from tokio, not needed for async_std.
fn delayed_interval(duration: Duration) -> impl Stream<Item = tokio::time::Instant> {
tokio_interval_stream(duration).skip(0)
}

View File

@ -1,8 +1,8 @@
use async_trait::async_trait;
use futures_util::future::BoxFuture;
use opentelemetry::{
propagation::TextMapPropagator,
sdk::{
export::trace::{SpanData, SpanExporter},
export::trace::{ExportResult, SpanData, SpanExporter},
propagation::{BaggagePropagator, TextMapCompositePropagator, TraceContextPropagator},
trace::{Tracer, TracerProvider},
},
@ -158,15 +158,14 @@ fn build_sampled_context() -> (Context, impl Subscriber, TestExporter, TracerPro
#[derive(Clone, Default, Debug)]
struct TestExporter(Arc<Mutex<Vec<SpanData>>>);
#[async_trait]
impl SpanExporter for TestExporter {
async fn export(
&mut self,
mut batch: Vec<SpanData>,
) -> opentelemetry::sdk::export::trace::ExportResult {
if let Ok(mut inner) = self.0.lock() {
inner.append(&mut batch);
}
Ok(())
fn export(&mut self, mut batch: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
let spans = self.0.clone();
Box::pin(async move {
if let Ok(mut inner) = spans.lock() {
inner.append(&mut batch);
}
Ok(())
})
}
}