subscriber: avoid double panics in CurrentSpan (#325)

## Motivation

The `ThreadLocal` struct used by `CurrentSpan` currently can cause a
double panic, since it is unclear why the current thread's slot is
unset. The `try_get` function may return `None` because that thread has
not yet been created, _or_ it may return `None` because we are
panicking. However, the code that calls this function doesn't correctly
handle this --- it assumes `None` is returned because the thread has not
yet had its local slot created, and `expect`s that it will exist after
inserting the local slot.

## Solution

I've updated this code to present a more `LocalKey`-like API, and avoid
potential double panics here. Now, when the thread is panicking, the
closure passed into `with` is not invoked.

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
This commit is contained in:
Eliza Weisman
2019-09-04 16:10:35 -07:00
committed by GitHub
parent 8bd4d90e78
commit db9929794c
3 changed files with 45 additions and 89 deletions

View File

@@ -171,15 +171,19 @@ impl<S: Subscriber> Layer<S> for Filter {
fn enabled(&self, metadata: &Metadata<'_>, _: Context<'_, S>) -> bool {
let level = metadata.level();
for filter in self.scope.get().iter() {
if filter >= level {
return true;
}
}
self.scope
.with(|scope| {
for filter in scope.iter() {
if filter >= level {
return true;
}
}
// TODO: other filters...
// TODO: other filters...
false
false
})
.unwrap_or(false)
}
fn new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, _: Context<'_, S>) {
@@ -201,13 +205,13 @@ impl<S: Subscriber> Layer<S> for Filter {
// that to allow changing the filter while a span is already entered.
// But that might be much less efficient...
if let Some(span) = try_lock!(self.by_id.read()).get(id) {
self.scope.get().push(span.level());
self.scope.with(|scope| scope.push(span.level()));
}
}
fn on_exit(&self, id: &span::Id, _: Context<'_, S>) {
if self.cares_about_span(id) {
self.scope.get().pop();
self.scope.with(|scope| scope.pop());
}
}

View File

@@ -98,17 +98,19 @@ impl CurrentSpan {
/// Returns the [`Id`](::Id) of the span in which the current thread is
/// executing, or `None` if it is not inside of a span.
pub fn id(&self) -> Option<Id> {
self.current.get().last().cloned()
self.current.with(|current| current.last().cloned())?
}
/// Records that the current thread has entered the span with the provided ID.
pub fn enter(&self, span: Id) {
self.current.get().push(span)
self.current.with(|current| current.push(span));
}
/// Records that the current thread has exited a span.
pub fn exit(&self) {
self.current.get().pop();
self.current.with(|current| {
let _ = current.pop();
});
}
}

View File

@@ -1,14 +1,11 @@
use std::sync::atomic::{AtomicUsize, Ordering};
use std::{
cell::{Cell, UnsafeCell},
fmt,
marker::PhantomData,
};
use std::{
ops::{Deref, DerefMut},
sync::atomic::{AtomicUsize, Ordering},
};
use crossbeam_utils::sync::{ShardedLock, ShardedLockReadGuard};
use crossbeam_utils::sync::ShardedLock;
pub(crate) struct Local<T> {
inner: ShardedLock<Inner<T>>,
@@ -16,14 +13,6 @@ pub(crate) struct Local<T> {
type Inner<T> = Vec<Option<UnsafeCell<T>>>;
pub(crate) struct LocalGuard<'a, T> {
inner: *mut T,
/// Hold the read guard for the lifetime of this guard. We're not actually
/// accessing the data behind that guard, but holding the read lock will
/// keep another thread from reallocating the vec.
_lock: ShardedLockReadGuard<'a, Inner<T>>,
}
/// Uniquely identifies a thread.
#[repr(transparent)]
#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
@@ -46,26 +35,25 @@ impl<T> Local<T> {
}
}
/// Returns a `LocalGuard` that dereferences to the local data for the
/// current thread. If no local data exists, the provided function is
/// invoked and the result is inserted.
pub(crate) fn get_or_else<'a>(&'a self, new: impl FnOnce() -> T) -> LocalGuard<'a, T> {
pub(crate) fn with_or_else<O>(
&self,
new: impl FnOnce() -> T,
f: impl FnOnce(&mut T) -> O,
) -> Option<O> {
let i = Id::current().as_usize();
if let Some(guard) = self.try_get(i) {
guard
} else {
self.new_thread(i, new);
self.try_get(i).expect("data was just inserted")
}
let mut f = Some(f);
self.try_with_index(i, |item| f.take().expect("called twice")(item))
.or_else(move || {
self.new_thread(i, new);
self.try_with_index(i, |item| f.take().expect("called twice")(item))
})
}
// Returns a `LocalGuard` for the local data for this thread, or `None` if
// no local data has been inserted.
fn try_get<'a>(&'a self, i: usize) -> Option<LocalGuard<'a, T>> {
fn try_with_index<O>(&self, i: usize, f: impl FnOnce(&mut T) -> O) -> Option<O> {
let lock = try_lock!(self.inner.read(), else return None);
let slot = lock.get(i)?.as_ref()?;
let inner = slot.get();
Some(LocalGuard { inner, _lock: lock })
let item = unsafe { &mut *slot.get() };
Some(f(item))
}
#[cold]
@@ -78,10 +66,9 @@ impl<T> Local<T> {
}
impl<T: Default> Local<T> {
/// Returns a `LocalGuard` that dereferences to the local data for the
/// current thread. If no local data exists, the default value is inserted.
pub(crate) fn get<'a>(&'a self) -> LocalGuard<'a, T> {
self.get_or_else(T::default)
#[inline]
pub(crate) fn with<O>(&self, f: impl FnOnce(&mut T) -> O) -> Option<O> {
self.with_or_else(T::default, f)
}
}
@@ -90,55 +77,18 @@ unsafe impl<T> Sync for Local<T> {}
impl<T: fmt::Debug> fmt::Debug for Local<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let id = Id::current();
match self.try_get(id.as_usize()) {
Some(local) => f
.debug_struct("Local")
self.try_with_index(id.as_usize(), |local| {
f.debug_struct("Local")
.field("thread", &id)
.field("local", &*local)
.finish(),
None => f
.debug_struct("Local")
.finish()
})
.unwrap_or_else(|| {
f.debug_struct("Local")
.field("thread", &id)
.field("local", &format_args!("<uninitialized>"))
.finish(),
}
}
}
// === impl LocalGuard ===
impl<'a, T> Deref for LocalGuard<'a, T> {
type Target = T;
#[inline]
fn deref(&self) -> &T {
unsafe {
// this is safe, as the `Local` only allows access to each slot
// from a single thread.
&*self.inner
}
}
}
impl<'a, T> DerefMut for LocalGuard<'a, T> {
#[inline]
fn deref_mut(&mut self) -> &mut T {
unsafe {
// this is safe, as the `Local` only allows access to each slot
// from a single thread.
&mut *self.inner
}
}
}
impl<'a, T: fmt::Debug> fmt::Debug for LocalGuard<'a, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.deref().fmt(f)
}
}
impl<'a, T: fmt::Display> fmt::Display for LocalGuard<'a, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.deref().fmt(f)
.finish()
})
}
}