mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-09-28 12:10:37 +00:00
trace-core: Dispatchers unset themselves (#1033)
This branch changes `dispatcher::get_default` to unset the thread's current dispatcher while the reference to it is held by the closure. This prevents infinite loops if the subscriber calls code paths which emit events or construct spans. Note that this also means that nested calls to `get_default` inside of a `get_default` closure will receive a `None` dispatcher rather than the "actual" dispatcher. However, it was necessary to unset the default in `get_default` rather than in dispatch methods such as `Dispatch::enter`, as when those functions are called, the current state has already been borrowed. Before: ``` test enter_span ... bench: 3 ns/iter (+/- 0) test span_no_fields ... bench: 51 ns/iter (+/- 12) test span_repeatedly ... bench: 5,073 ns/iter (+/- 1,528) test span_with_fields ... bench: 56 ns/iter (+/- 49) test span_with_fields_record ... bench: 363 ns/iter (+/- 61) ``` After: ``` test enter_span ... bench: 3 ns/iter (+/- 0) test span_no_fields ... bench: 35 ns/iter (+/- 12) test span_repeatedly ... bench: 4,165 ns/iter (+/- 298) test span_with_fields ... bench: 48 ns/iter (+/- 12) test span_with_fields_record ... bench: 363 ns/iter (+/- 91) ``` Closes #1032 Signed-off-by: Eliza Weisman <eliza@buoyant.io>
This commit is contained in:
parent
88b942652c
commit
4bfa4ffcdf
48
tokio-trace/tests/subscriber.rs
Normal file
48
tokio-trace/tests/subscriber.rs
Normal file
@ -0,0 +1,48 @@
|
||||
#[macro_use]
|
||||
extern crate tokio_trace;
|
||||
use tokio_trace::{
|
||||
span,
|
||||
subscriber::{with_default, Interest, Subscriber},
|
||||
Event, Level, Metadata,
|
||||
};
|
||||
|
||||
#[test]
|
||||
fn event_macros_dont_infinite_loop() {
|
||||
// This test ensures that an event macro within a subscriber
|
||||
// won't cause an infinite loop of events.
|
||||
struct TestSubscriber;
|
||||
impl Subscriber for TestSubscriber {
|
||||
fn register_callsite(&self, _: &Metadata) -> Interest {
|
||||
// Always return sometimes so that `enabled` will be called
|
||||
// (which can loop).
|
||||
Interest::sometimes()
|
||||
}
|
||||
|
||||
fn enabled(&self, meta: &Metadata) -> bool {
|
||||
assert!(meta.fields().iter().any(|f| f.name() == "foo"));
|
||||
event!(Level::TRACE, bar = false);
|
||||
true
|
||||
}
|
||||
|
||||
fn new_span(&self, _: &span::Attributes) -> span::Id {
|
||||
span::Id::from_u64(0xAAAA)
|
||||
}
|
||||
|
||||
fn record(&self, _: &span::Id, _: &span::Record) {}
|
||||
|
||||
fn record_follows_from(&self, _: &span::Id, _: &span::Id) {}
|
||||
|
||||
fn event(&self, event: &Event) {
|
||||
assert!(event.metadata().fields().iter().any(|f| f.name() == "foo"));
|
||||
event!(Level::TRACE, baz = false);
|
||||
}
|
||||
|
||||
fn enter(&self, _: &span::Id) {}
|
||||
|
||||
fn exit(&self, _: &span::Id) {}
|
||||
}
|
||||
|
||||
with_default(TestSubscriber, || {
|
||||
event!(Level::TRACE, foo = false);
|
||||
})
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
//! Dispatches trace events to `Subscriber`s.
|
||||
//! Dispatches trace events to `Subscriber`s.c
|
||||
use {
|
||||
callsite, span,
|
||||
subscriber::{self, Subscriber},
|
||||
@ -7,7 +7,7 @@ use {
|
||||
|
||||
use std::{
|
||||
any::Any,
|
||||
cell::RefCell,
|
||||
cell::{Cell, RefCell},
|
||||
fmt,
|
||||
sync::{Arc, Weak},
|
||||
};
|
||||
@ -21,9 +21,30 @@ pub struct Dispatch {
|
||||
}
|
||||
|
||||
thread_local! {
|
||||
static CURRENT_DISPATCH: RefCell<Dispatch> = RefCell::new(Dispatch::none());
|
||||
static CURRENT_STATE: State = State {
|
||||
default: RefCell::new(Dispatch::none()),
|
||||
can_enter: Cell::new(true),
|
||||
};
|
||||
}
|
||||
|
||||
/// The dispatch state of a thread.
|
||||
struct State {
|
||||
/// This thread's current default dispatcher.
|
||||
default: RefCell<Dispatch>,
|
||||
/// Whether or not we can currently begin dispatching a trace event.
|
||||
///
|
||||
/// This is set to `false` when functions such as `enter`, `exit`, `event`,
|
||||
/// and `new_span` are called on this thread's default dispatcher, to
|
||||
/// prevent further trace events triggered inside those functions from
|
||||
/// creating an infinite recursion. When we finish handling a dispatch, this
|
||||
/// is set back to `true`.
|
||||
can_enter: Cell<bool>,
|
||||
}
|
||||
|
||||
/// A guard that resets the current default dispatcher to the prior
|
||||
/// default dispatcher when dropped.
|
||||
struct ResetGuard(Option<Dispatch>);
|
||||
|
||||
/// Sets this dispatch as the default for the duration of a closure.
|
||||
///
|
||||
/// The default dispatcher is used when creating a new [span] or
|
||||
@ -35,34 +56,44 @@ thread_local! {
|
||||
/// [`Subscriber`]: ../subscriber/trait.Subscriber.html
|
||||
/// [`Event`]: ../event/struct.Event.html
|
||||
pub fn with_default<T>(dispatcher: &Dispatch, f: impl FnOnce() -> T) -> T {
|
||||
// A drop guard that resets CURRENT_DISPATCH to the prior dispatcher.
|
||||
// Using this (rather than simply resetting after calling `f`) ensures
|
||||
// that we always reset to the prior dispatcher even if `f` panics.
|
||||
struct ResetGuard(Option<Dispatch>);
|
||||
impl Drop for ResetGuard {
|
||||
fn drop(&mut self) {
|
||||
if let Some(dispatch) = self.0.take() {
|
||||
let _ = CURRENT_DISPATCH.try_with(|current| {
|
||||
*current.borrow_mut() = dispatch;
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let dispatcher = dispatcher.clone();
|
||||
let prior = CURRENT_DISPATCH.try_with(|current| current.replace(dispatcher));
|
||||
let _guard = ResetGuard(prior.ok());
|
||||
// When this guard is dropped, the default dispatcher will be reset to the
|
||||
// prior default. Using this (rather than simply resetting after calling
|
||||
// `f`) ensures that we always reset to the prior dispatcher even if `f`
|
||||
// panics.
|
||||
let _guard = State::set_default(dispatcher.clone());
|
||||
f()
|
||||
}
|
||||
/// Executes a closure with a reference to this thread's current [dispatcher].
|
||||
///
|
||||
/// Note that calls to `get_default` should not be nested; if this function is
|
||||
/// called while inside of another `get_default`, that closure will be provided
|
||||
/// with `Dispatch::none` rather than the previously set dispatcher.
|
||||
///
|
||||
/// [dispatcher]: ../dispatcher/struct.Dispatch.html
|
||||
pub fn get_default<T, F>(mut f: F) -> T
|
||||
where
|
||||
F: FnMut(&Dispatch) -> T,
|
||||
{
|
||||
CURRENT_DISPATCH
|
||||
.try_with(|current| f(&*current.borrow()))
|
||||
// While this guard is active, additional calls to subscriber functions on
|
||||
// the default dispatcher will not be able to access the dispatch context.
|
||||
// Dropping the guard will allow the dispatch context to be re-entered.
|
||||
struct Entered<'a>(&'a Cell<bool>);
|
||||
impl<'a> Drop for Entered<'a> {
|
||||
#[inline]
|
||||
fn drop(&mut self) {
|
||||
self.0.set(true);
|
||||
}
|
||||
}
|
||||
|
||||
CURRENT_STATE
|
||||
.try_with(|state| {
|
||||
if state.can_enter.replace(false) {
|
||||
let _guard = Entered(&state.can_enter);
|
||||
f(&state.default.borrow())
|
||||
} else {
|
||||
f(&Dispatch::none())
|
||||
}
|
||||
})
|
||||
.unwrap_or_else(|_| f(&Dispatch::none()))
|
||||
}
|
||||
|
||||
@ -70,6 +101,7 @@ pub(crate) struct Registrar(Weak<Subscriber + Send + Sync>);
|
||||
|
||||
impl Dispatch {
|
||||
/// Returns a new `Dispatch` that discards events and spans.
|
||||
#[inline]
|
||||
pub fn none() -> Self {
|
||||
Dispatch {
|
||||
subscriber: Arc::new(NoSubscriber),
|
||||
@ -173,7 +205,7 @@ impl Dispatch {
|
||||
self.subscriber.event(event)
|
||||
}
|
||||
|
||||
/// Records that a span has been entered.
|
||||
/// Records that a span has been can_enter.
|
||||
///
|
||||
/// This calls the [`enter`] function on the [`Subscriber`] that this
|
||||
/// `Dispatch` forwards to.
|
||||
@ -182,7 +214,7 @@ impl Dispatch {
|
||||
/// [`event`]: ../subscriber/trait.Subscriber.html#method.event
|
||||
#[inline]
|
||||
pub fn enter(&self, span: &span::Id) {
|
||||
self.subscriber.enter(span)
|
||||
self.subscriber.enter(span);
|
||||
}
|
||||
|
||||
/// Records that a span has been exited.
|
||||
@ -191,7 +223,7 @@ impl Dispatch {
|
||||
/// that this `Dispatch` forwards to.
|
||||
#[inline]
|
||||
pub fn exit(&self, span: &span::Id) {
|
||||
self.subscriber.exit(span)
|
||||
self.subscriber.exit(span);
|
||||
}
|
||||
|
||||
/// Notifies the subscriber that a [span ID] has been cloned.
|
||||
@ -295,9 +327,50 @@ impl Registrar {
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl State =====
|
||||
|
||||
impl State {
|
||||
/// Replaces the current default dispatcher on this thread with the provided
|
||||
/// dispatcher.Any
|
||||
///
|
||||
/// Dropping the returned `ResetGuard` will reset the default dispatcher to
|
||||
/// the previous value.
|
||||
#[inline]
|
||||
fn set_default(new_dispatch: Dispatch) -> ResetGuard {
|
||||
let prior = CURRENT_STATE
|
||||
.try_with(|state| {
|
||||
state.can_enter.set(true);
|
||||
state.default.replace(new_dispatch)
|
||||
})
|
||||
.ok();
|
||||
ResetGuard(prior)
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl ResetGuard =====
|
||||
|
||||
impl Drop for ResetGuard {
|
||||
#[inline]
|
||||
fn drop(&mut self) {
|
||||
if let Some(dispatch) = self.0.take() {
|
||||
let _ = CURRENT_STATE.try_with(|state| {
|
||||
*state.default.borrow_mut() = dispatch;
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use {
|
||||
callsite::Callsite,
|
||||
metadata::{Kind, Level, Metadata},
|
||||
span,
|
||||
subscriber::{Interest, Subscriber},
|
||||
Event,
|
||||
};
|
||||
|
||||
#[test]
|
||||
fn dispatch_is() {
|
||||
@ -310,4 +383,105 @@ mod test {
|
||||
let dispatcher = Dispatch::new(NoSubscriber);
|
||||
assert!(dispatcher.downcast_ref::<NoSubscriber>().is_some());
|
||||
}
|
||||
|
||||
struct TestCallsite;
|
||||
static TEST_CALLSITE: TestCallsite = TestCallsite;
|
||||
static TEST_META: Metadata<'static> = metadata! {
|
||||
name: "test",
|
||||
target: module_path!(),
|
||||
level: Level::DEBUG,
|
||||
fields: &[],
|
||||
callsite: &TEST_CALLSITE,
|
||||
kind: Kind::EVENT
|
||||
};
|
||||
|
||||
impl Callsite for TestCallsite {
|
||||
fn set_interest(&self, _: Interest) {}
|
||||
fn metadata(&self) -> &Metadata {
|
||||
&TEST_META
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn events_dont_infinite_loop() {
|
||||
// This test ensures that an event triggered within a subscriber
|
||||
// won't cause an infinite loop of events.
|
||||
struct TestSubscriber;
|
||||
impl Subscriber for TestSubscriber {
|
||||
fn enabled(&self, _: &Metadata) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
fn new_span(&self, _: &span::Attributes) -> span::Id {
|
||||
span::Id::from_u64(0xAAAA)
|
||||
}
|
||||
|
||||
fn record(&self, _: &span::Id, _: &span::Record) {}
|
||||
|
||||
fn record_follows_from(&self, _: &span::Id, _: &span::Id) {}
|
||||
|
||||
fn event(&self, _: &Event) {
|
||||
static EVENTS: AtomicUsize = AtomicUsize::new(0);
|
||||
assert_eq!(
|
||||
EVENTS.fetch_add(1, Ordering::Relaxed),
|
||||
0,
|
||||
"event method called twice!"
|
||||
);
|
||||
Event::dispatch(&TEST_META, &TEST_META.fields().value_set(&[]))
|
||||
}
|
||||
|
||||
fn enter(&self, _: &span::Id) {}
|
||||
|
||||
fn exit(&self, _: &span::Id) {}
|
||||
}
|
||||
|
||||
with_default(&Dispatch::new(TestSubscriber), || {
|
||||
Event::dispatch(&TEST_META, &TEST_META.fields().value_set(&[]))
|
||||
})
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn spans_dont_infinite_loop() {
|
||||
// This test ensures that a span created within a subscriber
|
||||
// won't cause an infinite loop of new spans.
|
||||
|
||||
fn mk_span() {
|
||||
get_default(|current| {
|
||||
current.new_span(&span::Attributes::new(
|
||||
&TEST_META,
|
||||
&TEST_META.fields().value_set(&[]),
|
||||
))
|
||||
});
|
||||
}
|
||||
|
||||
struct TestSubscriber;
|
||||
impl Subscriber for TestSubscriber {
|
||||
fn enabled(&self, _: &Metadata) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
fn new_span(&self, _: &span::Attributes) -> span::Id {
|
||||
static NEW_SPANS: AtomicUsize = AtomicUsize::new(0);
|
||||
assert_eq!(
|
||||
NEW_SPANS.fetch_add(1, Ordering::Relaxed),
|
||||
0,
|
||||
"new_span method called twice!"
|
||||
);
|
||||
mk_span();
|
||||
span::Id::from_u64(0xAAAA)
|
||||
}
|
||||
|
||||
fn record(&self, _: &span::Id, _: &span::Record) {}
|
||||
|
||||
fn record_follows_from(&self, _: &span::Id, _: &span::Id) {}
|
||||
|
||||
fn event(&self, _: &Event) {}
|
||||
|
||||
fn enter(&self, _: &span::Id) {}
|
||||
|
||||
fn exit(&self, _: &span::Id) {}
|
||||
}
|
||||
|
||||
with_default(&Dispatch::new(TestSubscriber), || mk_span())
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user