trace-core: add a function to rebuild cached interest (#1039)

## Motivation

Currently, `tokio-trace-core` permits `Subscriber`s to indicate that
they are "always", "sometimes", or "never" interested in a particular
callsite. When "always" or "never" is returned, then the interest is
cached and the subscriber will not be asked again about that callsite.
This is much more efficient than requiring the filter to be re-evaluated
every time the callsite is hit.

However, if a subscriber wishes to change its filter configuration
dynamically at runtime, it cannot benefit from this caching. Instead, it
must always return `Interest::sometimes`.  Even when filters change very
infrequently, they must still always be re-evaluated every time.

In order to support a use-case where subscribers may change their filter
configuration at runtime (e.g. tokio-rs/tokio-trace-nursery#42),
but do so infrequently, we should introducing a new function to
invalidate the cached interest.

## Solution

This branch adds a new function in the `callsite` module, called
`rebuild_interest_cache`, that will invalidate and rebuild all cached
interest.

## Breaking Change

In order to fix a race condition that could occur when rebuilding
interest caches using `clear_interest` and `add_interest`, these methods
have been replaced by a new `set_interest` method. `set_interest` should
have the semantics of atomically replacing the previous cached interest,
so that the callsite does not enter a temporary state where it has no
interest.

Closes #1038

Co-Authored-By: yaahallo <jlusby42@gmail.com>
This commit is contained in:
Jane Lusby 2019-04-10 13:51:05 -07:00 committed by Eliza Weisman
parent 7ae010f0f3
commit b4fe517a16
12 changed files with 321 additions and 237 deletions

View File

@ -15,7 +15,7 @@ keywords = ["logging", "tracing"]
publish = false
[dependencies]
tokio-trace-core = "0.1"
tokio-trace-core = { path = "./tokio-trace-core" }
log = { version = "0.4", optional = true }
cfg-if = "0.1.7"

View File

@ -1274,28 +1274,15 @@ macro_rules! callsite {
}
}
impl callsite::Callsite for MyCallsite {
fn add_interest(&self, interest: Interest) {
let current_interest = self.interest();
fn set_interest(&self, interest: Interest) {
let interest = match () {
// If the added interest is `never()`, don't change anything
// — either a different subscriber added a higher
// interest, which we want to preserve, or the interest is 0
// anyway (as it's initialized to 0).
_ if interest.is_never() => return,
// If the interest is `sometimes()`, that overwrites a `never()`
// interest, but doesn't downgrade an `always()` interest.
_ if interest.is_sometimes() && current_interest.is_never() => 1,
// If the interest is `always()`, we overwrite the current
// interest, as always() is the highest interest level and
// should take precedent.
_ if interest.is_never() => 0,
_ if interest.is_always() => 2,
_ => return,
_ => 1,
};
INTEREST.store(interest, Ordering::Relaxed);
}
fn clear_interest(&self) {
INTEREST.store(0, Ordering::Relaxed);
INTEREST.store(interest, Ordering::SeqCst);
}
fn metadata(&self) -> &Metadata {
&META
}

View File

@ -0,0 +1,64 @@
// Tests that depend on a count of the number of times their filter is evaluated
// can't exist in the same file with other tests that add subscribers to the
// registry. The registry was changed so that each time a new dispatcher is
// added all filters are re-evaluated. The tests being run only in separate
// threads with shared global state lets them interfere with eachother
#[macro_use]
extern crate tokio_trace;
mod support;
use self::support::*;
use tokio_trace::subscriber::with_default;
use tokio_trace::Level;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
#[test]
fn filter_caching_is_lexically_scoped() {
pub fn my_great_function() -> bool {
span!(Level::TRACE, "emily").enter(|| true)
}
pub fn my_other_function() -> bool {
span!(Level::TRACE, "frank").enter(|| true)
}
let count = Arc::new(AtomicUsize::new(0));
let count2 = count.clone();
let subscriber = subscriber::mock()
.with_filter(move |meta| match meta.name {
"emily" | "frank" => {
count2.fetch_add(1, Ordering::Relaxed);
true
}
_ => false,
})
.run();
with_default(subscriber, || {
// Call the function once. The filter should be re-evaluated.
assert!(my_great_function());
assert_eq!(count.load(Ordering::Relaxed), 1);
// Call the function again. The cached result should be used.
assert!(my_great_function());
assert_eq!(count.load(Ordering::Relaxed), 1);
assert!(my_other_function());
assert_eq!(count.load(Ordering::Relaxed), 2);
assert!(my_great_function());
assert_eq!(count.load(Ordering::Relaxed), 2);
assert!(my_other_function());
assert_eq!(count.load(Ordering::Relaxed), 2);
assert!(my_great_function());
assert_eq!(count.load(Ordering::Relaxed), 2);
});
}

View File

@ -0,0 +1,69 @@
// Tests that depend on a count of the number of times their filter is evaluated
// cant exist in the same file with other tests that add subscribers to the
// registry. The registry was changed so that each time a new dispatcher is
// added all filters are re-evaluated. The tests being run only in separate
// threads with shared global state lets them interfere with eachother
#[macro_use]
extern crate tokio_trace;
mod support;
use self::support::*;
use tokio_trace::subscriber::with_default;
use tokio_trace::Level;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
#[test]
fn filters_are_not_reevaluated_for_the_same_span() {
// Asserts that the `span!` macro caches the result of calling
// `Subscriber::enabled` for each span.
let alice_count = Arc::new(AtomicUsize::new(0));
let bob_count = Arc::new(AtomicUsize::new(0));
let alice_count2 = alice_count.clone();
let bob_count2 = bob_count.clone();
let (subscriber, handle) = subscriber::mock()
.with_filter(move |meta| match meta.name {
"alice" => {
alice_count2.fetch_add(1, Ordering::Relaxed);
false
}
"bob" => {
bob_count2.fetch_add(1, Ordering::Relaxed);
true
}
_ => false,
})
.run_with_handle();
with_default(subscriber, move || {
// Enter "alice" and then "bob". The dispatcher expects to see "bob" but
// not "alice."
let alice = span!(Level::TRACE, "alice");
let bob = alice.enter(|| {
let bob = span!(Level::TRACE, "bob");
bob.enter(|| ());
bob
});
// The filter should have seen each span a single time.
assert_eq!(alice_count.load(Ordering::Relaxed), 1);
assert_eq!(bob_count.load(Ordering::Relaxed), 1);
alice.enter(|| bob.enter(|| {}));
// The subscriber should see "bob" again, but the filter should not have
// been called.
assert_eq!(alice_count.load(Ordering::Relaxed), 1);
assert_eq!(bob_count.load(Ordering::Relaxed), 1);
bob.enter(|| {});
assert_eq!(alice_count.load(Ordering::Relaxed), 1);
assert_eq!(bob_count.load(Ordering::Relaxed), 1);
});
handle.assert_finished();
}

View File

@ -0,0 +1,80 @@
// Tests that depend on a count of the number of times their filter is evaluated
// cant exist in the same file with other tests that add subscribers to the
// registry. The registry was changed so that each time a new dispatcher is
// added all filters are re-evaluated. The tests being run only in separate
// threads with shared global state lets them interfere with eachother
#[macro_use]
extern crate tokio_trace;
mod support;
use self::support::*;
use tokio_trace::subscriber::with_default;
use tokio_trace::Level;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
#[test]
fn filters_are_reevaluated_for_different_call_sites() {
// Asserts that the `span!` macro caches the result of calling
// `Subscriber::enabled` for each span.
let charlie_count = Arc::new(AtomicUsize::new(0));
let dave_count = Arc::new(AtomicUsize::new(0));
let charlie_count2 = charlie_count.clone();
let dave_count2 = dave_count.clone();
let subscriber = subscriber::mock()
.with_filter(move |meta| {
println!("Filter: {:?}", meta.name);
match meta.name {
"charlie" => {
charlie_count2.fetch_add(1, Ordering::Relaxed);
false
}
"dave" => {
dave_count2.fetch_add(1, Ordering::Relaxed);
true
}
_ => false,
}
})
.run();
with_default(subscriber, move || {
// Enter "charlie" and then "dave". The dispatcher expects to see "dave" but
// not "charlie."
let charlie = span!(Level::TRACE, "charlie");
let dave = charlie.enter(|| {
let dave = span!(Level::TRACE, "dave");
dave.enter(|| {});
dave
});
// The filter should have seen each span a single time.
assert_eq!(charlie_count.load(Ordering::Relaxed), 1);
assert_eq!(dave_count.load(Ordering::Relaxed), 1);
charlie.enter(|| dave.enter(|| {}));
// The subscriber should see "dave" again, but the filter should not have
// been called.
assert_eq!(charlie_count.load(Ordering::Relaxed), 1);
assert_eq!(dave_count.load(Ordering::Relaxed), 1);
// A different span with the same name has a different call site, so it
// should cause the filter to be reapplied.
let charlie2 = span!(Level::TRACE, "charlie");
charlie.enter(|| {});
assert_eq!(charlie_count.load(Ordering::Relaxed), 2);
assert_eq!(dave_count.load(Ordering::Relaxed), 1);
// But, the filter should not be re-evaluated for the new "charlie" span
// when it is re-entered.
charlie2.enter(|| span!(Level::TRACE, "dave").enter(|| {}));
assert_eq!(charlie_count.load(Ordering::Relaxed), 2);
assert_eq!(dave_count.load(Ordering::Relaxed), 2);
});
}

View File

@ -1,171 +0,0 @@
#[macro_use]
extern crate tokio_trace;
mod support;
use self::support::*;
use tokio_trace::subscriber::with_default;
use tokio_trace::Level;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
#[test]
fn filters_are_not_reevaluated_for_the_same_span() {
// Asserts that the `span!` macro caches the result of calling
// `Subscriber::enabled` for each span.
let alice_count = Arc::new(AtomicUsize::new(0));
let bob_count = Arc::new(AtomicUsize::new(0));
let alice_count2 = alice_count.clone();
let bob_count2 = bob_count.clone();
let (subscriber, handle) = subscriber::mock()
.with_filter(move |meta| match meta.name {
"alice" => {
alice_count2.fetch_add(1, Ordering::Relaxed);
false
}
"bob" => {
bob_count2.fetch_add(1, Ordering::Relaxed);
true
}
_ => false,
})
.run_with_handle();
with_default(subscriber, move || {
// Enter "alice" and then "bob". The dispatcher expects to see "bob" but
// not "alice."
let alice = span!(Level::TRACE, "alice");
let bob = alice.enter(|| {
let bob = span!(Level::TRACE, "bob");
bob.enter(|| ());
bob
});
// The filter should have seen each span a single time.
assert_eq!(alice_count.load(Ordering::Relaxed), 1);
assert_eq!(bob_count.load(Ordering::Relaxed), 1);
alice.enter(|| bob.enter(|| {}));
// The subscriber should see "bob" again, but the filter should not have
// been called.
assert_eq!(alice_count.load(Ordering::Relaxed), 1);
assert_eq!(bob_count.load(Ordering::Relaxed), 1);
bob.enter(|| {});
assert_eq!(alice_count.load(Ordering::Relaxed), 1);
assert_eq!(bob_count.load(Ordering::Relaxed), 1);
});
handle.assert_finished();
}
#[test]
fn filters_are_reevaluated_for_different_call_sites() {
// Asserts that the `span!` macro caches the result of calling
// `Subscriber::enabled` for each span.
let charlie_count = Arc::new(AtomicUsize::new(0));
let dave_count = Arc::new(AtomicUsize::new(0));
let charlie_count2 = charlie_count.clone();
let dave_count2 = dave_count.clone();
let subscriber = subscriber::mock()
.with_filter(move |meta| {
println!("Filter: {:?}", meta.name);
match meta.name {
"charlie" => {
charlie_count2.fetch_add(1, Ordering::Relaxed);
false
}
"dave" => {
dave_count2.fetch_add(1, Ordering::Relaxed);
true
}
_ => false,
}
})
.run();
with_default(subscriber, move || {
// Enter "charlie" and then "dave". The dispatcher expects to see "dave" but
// not "charlie."
let charlie = span!(Level::TRACE, "charlie");
let dave = charlie.enter(|| {
let dave = span!(Level::TRACE, "dave");
dave.enter(|| {});
dave
});
// The filter should have seen each span a single time.
assert_eq!(charlie_count.load(Ordering::Relaxed), 1);
assert_eq!(dave_count.load(Ordering::Relaxed), 1);
charlie.enter(|| dave.enter(|| {}));
// The subscriber should see "dave" again, but the filter should not have
// been called.
assert_eq!(charlie_count.load(Ordering::Relaxed), 1);
assert_eq!(dave_count.load(Ordering::Relaxed), 1);
// A different span with the same name has a different call site, so it
// should cause the filter to be reapplied.
let charlie2 = span!(Level::TRACE, "charlie");
charlie.enter(|| {});
assert_eq!(charlie_count.load(Ordering::Relaxed), 2);
assert_eq!(dave_count.load(Ordering::Relaxed), 1);
// But, the filter should not be re-evaluated for the new "charlie" span
// when it is re-entered.
charlie2.enter(|| span!(Level::TRACE, "dave").enter(|| {}));
assert_eq!(charlie_count.load(Ordering::Relaxed), 2);
assert_eq!(dave_count.load(Ordering::Relaxed), 2);
});
}
#[test]
fn filter_caching_is_lexically_scoped() {
pub fn my_great_function() -> bool {
span!(Level::TRACE, "emily").enter(|| true)
}
pub fn my_other_function() -> bool {
span!(Level::TRACE, "frank").enter(|| true)
}
let count = Arc::new(AtomicUsize::new(0));
let count2 = count.clone();
let subscriber = subscriber::mock()
.with_filter(move |meta| match meta.name {
"emily" | "frank" => {
count2.fetch_add(1, Ordering::Relaxed);
true
}
_ => false,
})
.run();
with_default(subscriber, || {
// Call the function once. The filter should be re-evaluated.
assert!(my_great_function());
assert_eq!(count.load(Ordering::Relaxed), 1);
// Call the function again. The cached result should be used.
assert!(my_great_function());
assert_eq!(count.load(Ordering::Relaxed), 1);
assert!(my_other_function());
assert_eq!(count.load(Ordering::Relaxed), 2);
assert!(my_great_function());
assert_eq!(count.load(Ordering::Relaxed), 2);
assert!(my_other_function());
assert_eq!(count.load(Ordering::Relaxed), 2);
assert!(my_great_function());
assert_eq!(count.load(Ordering::Relaxed), 2);
});
}

View File

@ -7,7 +7,7 @@ name = "tokio-trace-core"
# - README.md
# - Update CHANGELOG.md.
# - Create "v0.1.x" git tag.
version = "0.1.0"
version = "0.2.0"
authors = ["Tokio Contributors <team@tokio.rs>"]
license = "MIT"
repository = "https://github.com/tokio-rs/tokio"

View File

@ -7,7 +7,7 @@ use std::{
sync::Mutex,
};
use {
dispatcher::{self, Dispatch},
dispatcher::{self, Dispatch, Registrar},
subscriber::Interest,
Metadata,
};
@ -24,23 +24,39 @@ struct Registry {
dispatchers: Vec<dispatcher::Registrar>,
}
/// Trait implemented by callsites.
pub trait Callsite: Sync {
/// Adds the [`Interest`] returned by [registering] the callsite with a
/// [dispatcher].
///
/// If the interest is greater than or equal to the callsite's current
/// interest, this should change whether or not the callsite is enabled.
///
/// [`Interest`]: ../subscriber/struct.Interest.html
/// [registering]: ../subscriber/trait.Subscriber.html#method.register_callsite
/// [dispatcher]: ../dispatcher/struct.Dispatch.html
fn add_interest(&self, interest: Interest);
impl Registry {
fn rebuild_callsite_interest(&self, callsite: &'static Callsite) {
let meta = callsite.metadata();
/// Remove _all_ [`Interest`] from the callsite, disabling it.
let mut interest = Interest::never();
for registrar in &self.dispatchers {
if let Some(sub_interest) = registrar.try_register(meta) {
interest = interest.and(sub_interest);
}
}
callsite.set_interest(interest)
}
fn rebuild_interest(&mut self) {
self.dispatchers.retain(Registrar::is_alive);
self.callsites.iter().for_each(|&callsite| {
self.rebuild_callsite_interest(callsite);
});
}
}
/// Trait implemented by callsites.
///
/// These functions are only intended to be called by the [`Registry`] which
/// correctly handles determining the common interest between all subscribers.
pub trait Callsite: Sync {
/// Sets the [`Interest`] for this callsite.
///
/// [`Interest`]: ../subscriber/struct.Interest.html
fn clear_interest(&self);
fn set_interest(&self, interest: Interest);
/// Returns the [metadata] associated with the callsite.
///
@ -67,34 +83,37 @@ pub struct Identifier(
pub &'static Callsite,
);
/// Clear and reregister interest on every [`Callsite`]
///
/// This function is intended for runtime reconfiguration of filters on traces
/// when the filter recalculation is much less frequent than trace events are.
/// The alternative is to have the [`Subscriber`] that supports runtime
/// reconfiguration of filters always return [`Interest::sometimes()`] so that
/// [`enabled`] is evaluated for every event.
///
/// [`Callsite`]: ../callsite/trait.Callsite.html
/// [`enabled`]: ../subscriber/trait.Subscriber.html#tymethod.enabled
/// [`Interest::sometimes()`]: ../subscriber/struct.Interest.html#method.sometimes
/// [`Subscriber`]: ../subscriber/trait.Subscriber.html
pub fn rebuild_interest_cache() {
let mut registry = REGISTRY.lock().unwrap();
registry.rebuild_interest();
}
/// Register a new `Callsite` with the global registry.
///
/// This should be called once per callsite after the callsite has been
/// constructed.
pub fn register(callsite: &'static Callsite) {
let mut registry = REGISTRY.lock().unwrap();
let meta = callsite.metadata();
registry.dispatchers.retain(|registrar| {
match registrar.try_register(meta) {
Some(interest) => {
callsite.add_interest(interest);
true
}
// TODO: if the dispatcher has been dropped, should we invalidate
// any callsites that it previously enabled?
None => false,
}
});
registry.rebuild_callsite_interest(callsite);
registry.callsites.push(callsite);
}
pub(crate) fn register_dispatch(dispatch: &Dispatch) {
let mut registry = REGISTRY.lock().unwrap();
registry.dispatchers.push(dispatch.registrar());
for callsite in &registry.callsites {
let interest = dispatch.register_callsite(callsite.metadata());
callsite.add_interest(interest);
}
registry.rebuild_interest();
}
// ===== impl Identifier =====

View File

@ -289,6 +289,10 @@ impl Registrar {
pub(crate) fn try_register(&self, metadata: &Metadata) -> Option<subscriber::Interest> {
self.0.upgrade().map(|s| s.register_callsite(metadata))
}
pub(crate) fn is_alive(&self) -> bool {
self.0.upgrade().is_some()
}
}
#[cfg(test)]

View File

@ -686,8 +686,9 @@ mod test {
};
impl ::callsite::Callsite for TestCallsite1 {
fn add_interest(&self, _: ::subscriber::Interest) {}
fn clear_interest(&self) {}
fn set_interest(&self, _: ::subscriber::Interest) {
unimplemented!()
}
fn metadata(&self) -> &Metadata {
&TEST_META_1
@ -705,8 +706,9 @@ mod test {
};
impl ::callsite::Callsite for TestCallsite2 {
fn add_interest(&self, _: ::subscriber::Interest) {}
fn clear_interest(&self) {}
fn set_interest(&self, _: ::subscriber::Interest) {
unimplemented!()
}
fn metadata(&self) -> &Metadata {
&TEST_META_2

View File

@ -65,8 +65,7 @@ extern crate lazy_static;
/// // ...
/// }
/// impl callsite::Callsite for MyCallsite {
/// # fn add_interest(&self, _: Interest) { unimplemented!() }
/// # fn clear_interest(&self) {}
/// # fn set_interest(&self, _: Interest) { unimplemented!() }
/// # fn metadata(&self) -> &Metadata { unimplemented!() }
/// // ...
/// }
@ -103,8 +102,7 @@ macro_rules! identify_callsite {
/// # fn main() {
/// # pub struct MyCallsite { }
/// # impl Callsite for MyCallsite {
/// # fn add_interest(&self, _: Interest) { unimplemented!() }
/// # fn clear_interest(&self) {}
/// # fn set_interest(&self, _: Interest) { unimplemented!() }
/// # fn metadata(&self) -> &Metadata { unimplemented!() }
/// # }
/// #

View File

@ -19,7 +19,7 @@ use std::{
/// - Registering new spans as they are created, and providing them with span
/// IDs. Implicitly, this means the subscriber may determine the strategy for
/// determining span equality.
/// - Visiting the attachment of field values and follows-from annotations to
/// - Recording the attachment of field values and follows-from annotations to
/// spans.
/// - Filtering spans and events, and determining when those filters must be
/// invalidated.
@ -50,9 +50,9 @@ pub trait Subscriber: 'static {
/// indicate different interests, or to implement behaviour that should run
/// once for every callsite.
///
/// This function is guaranteed to be called exactly once per callsite on
/// This function is guaranteed to be called at least once per callsite on
/// every active subscriber. The subscriber may store the keys to fields it
/// cares in order to reduce the cost of accessing fields by name,
/// cares about in order to reduce the cost of accessing fields by name,
/// preallocate storage for that callsite, or perform any other actions it
/// wishes to perform once for each callsite.
///
@ -79,6 +79,10 @@ pub trait Subscriber: 'static {
/// set of metadata. Thus, the counter will not be incremented, and the span
/// or event that correspands to the metadata will never be `enabled`.
///
/// `Subscriber`s that need to change their filters occasionally should call
/// [`rebuild_interest_cache`] to re-evaluate `register_callsite` for all
/// callsites.
///
/// Similarly, if a `Subscriber` has a filtering strategy that can be
/// changed dynamically at runtime, it would need to re-evaluate that filter
/// if the cached results have changed.
@ -91,14 +95,19 @@ pub trait Subscriber: 'static {
/// return `Interest::Never`, as a new subscriber may be added that _is_
/// interested.
///
/// **Note**: If a subscriber returns `Interest::never` for a particular
/// callsite, it _may_ still see spans and events originating from that
/// callsite, if another subscriber expressed interest in it.
/// # Notes
/// This function may be called again when a new subscriber is created or
/// when the registry is invalidated.
///
/// If a subscriber returns `Interest::never` for a particular callsite, it
/// _may_ still see spans and events originating from that callsite, if
/// another subscriber expressed interest in it.
///
/// [filter]: #method.enabled
/// [metadata]: ../metadata/struct.Metadata.html
/// [`Interest`]: struct.Interest.html
/// [`enabled`]: #method.enabled
/// [`rebuild_interest_cache`]: ../callsite/fn.rebuild_interest_cache.html
fn register_callsite(&self, metadata: &Metadata) -> Interest {
match self.enabled(metadata) {
true => Interest::always(),
@ -396,7 +405,7 @@ impl Interest {
///
/// If all active subscribers are `sometimes` or `never` interested in a
/// callsite, the currently active subscriber will be asked to filter that
/// callsite every time it creates a span. This will be the case until a
/// callsite every time it creates a span. This will be the case until a new
/// subscriber expresses that it is `always` interested in the callsite.
#[inline]
pub fn sometimes() -> Self {
@ -442,4 +451,27 @@ impl Interest {
_ => false,
}
}
/// Returns the common interest between these two Interests.
///
/// The common interest is defined as the least restrictive, so if one
/// interest is `never` and the other is `always` the common interest is
/// `always`.
pub(crate) fn and(self, rhs: Interest) -> Self {
match rhs.0 {
// If the added interest is `never()`, don't change anything —
// either a different subscriber added a higher interest, which we
// want to preserve, or the interest is 0 anyway (as it's
// initialized to 0).
InterestKind::Never => self,
// If the interest is `sometimes()`, that overwrites a `never()`
// interest, but doesn't downgrade an `always()` interest.
InterestKind::Sometimes if self.0 == InterestKind::Never => rhs,
// If the interest is `always()`, we overwrite the current interest,
// as always() is the highest interest level and should take
// precedent.
InterestKind::Always => rhs,
_ => self,
}
}
}