diff --git a/tracing-subscriber/src/layer.rs b/tracing-subscriber/src/layer.rs index a56fad9b..53e6ef93 100644 --- a/tracing-subscriber/src/layer.rs +++ b/tracing-subscriber/src/layer.rs @@ -626,11 +626,11 @@ impl<'a, S: Subscriber + Registry> Context<'a, S> { self.subscriber.as_ref()?.span(id) } - fn span_mut(&self, id: &span::Id) -> Option { + fn span_mut(&self, id: &span::Id) -> Option { self.subscriber.as_ref()?.span_mut(id) } - fn parents<'a>(&'a self, span: &span::Id) -> Parents<'a, S> + fn parents(&'a self, span: &span::Id) -> Parents<'a, S> where S: Sized, { diff --git a/tracing-subscriber/src/registry/sharded.rs b/tracing-subscriber/src/registry/sharded.rs new file mode 100644 index 00000000..8c870b1c --- /dev/null +++ b/tracing-subscriber/src/registry/sharded.rs @@ -0,0 +1,285 @@ +use hashbrown::HashMap; +use tracing_core::span::Id; +use std::{ + mem, + thread, + sync::{ + atomic::{AtomicUsize, Ordering}, + PoisonError, + }, + cell::{RefCell, Cell, Ref, RefMut, UnsafeCell}, + fmt, +}; +use parking_lot::{ReentrantMutex, ReentrantMutexGuard, MappedReentrantMutexGuard}; +use crossbeam_utils::sync::{ShardedLock, ShardedLockReadGuard}; +use owning_ref::OwningHandle; + +pub struct Registry { + shards: ShardedLock>, +} + +#[derive(Copy, Clone, Hash, PartialEq, Eq, Debug)] +struct Thread { + id: usize, +} + +struct Shards(HashMap>); + +struct Shard { + spans: ReentrantMutex>>, +} + +pub struct Span<'a, T> { + inner: MappedReentrantMutexGuard<'a, Ref<'a, T>>, +} + +pub struct SpanMut<'a, T> { + inner: MappedReentrantMutexGuard<'a, RefMut<'a, T>>, +} + +#[derive(Debug)] +enum Error { + Poisoned, + BadInsert, +} + +type Spans = HashMap>; + +fn handle_err(result: Result) -> Option { + match result { + Ok(e) => Some(e), + Err(e) if thread::panicking() => { + println!("{}", e); + None + }, + Err(e) => panic!("{}", e), + } +} + +impl Registry { + fn with_local<'a, I>(&'a self, mut f: impl FnOnce(&'a Shard) -> I) -> Result { + // fast path --- the shard already exists + let thread = Thread::current(); + let mut f = Some(f); + + if let Some(r) = self.shards.read()? + .with_shard(&thread, &mut f) + { + Ok(r) + } else { + // slow path --- need to insert a shard. + self.with_new_local(&thread, &mut f) + } + } + + #[cold] + fn with_new_local<'a, I>(&'a self, thread: &Thread, f: &mut Option)-> I>,) -> Result { + self.shards.write()? + .new_shard_for(thread.clone()) + .with_shard(&thread, &mut f).ok_or(Error::BadInsert) + } + + pub fn span(&self, id: &Id) -> Option> { + let local = self.with_local(|shard| { shard.span(id) }); + let local = handle_err(local)?; + match local { + Some(slot) => { + let inner = MappedReentrantMutexGuard::try_map(slot, |slot| slot.as_ref()).ok()?; + return Some(Span { inner }); + }, + None => { + // TODO: steal + } + } + + None + } + + pub fn span_mut<'a>(&'a self, id: &Id) -> Option> { + // let res = self.with_local(|shard| { + // shard.span(id).map(|inner| Ref { inner }) + // }); + // handle_err(res)? + unimplemented!() + } + + pub fn with_span(&self, id: &Id, f: impl FnOnce(&mut T) -> I) -> Option { + // let mut f = Some(f); + // let res = self.with_shard(|shard| { + // shard.get_mut(id).and_then(Slot::get_mut).map(|span| { + // let mut f = f.take().expect("called twice!"); + // f(span) + // }) + // }); + // handle_poison(res)? + + // TODO: steal + unimplemented!() + } + + pub fn insert(&self, id: Id, span: T) -> &Self { + let ok = self.with_local(move |shard| { shard.insert(id, span) }); + if !thread::panicking() { + ok.expect("poisoned"); + } + + self + } + + pub fn new() -> Self { + Self { + shards: ShardedLock::new(Shards(HashMap::new())) + } + } +} + +impl Shards { + fn with_shard<'a, I>( + &'a self, + thread: &Thread, + f: &mut Option)-> I>, + ) -> Option { + let shard = self.0.get(thread)?; + let mut f = match f.take() { + None if thread::panicking() => { + println!("with_shard: closure called twice; this is a bug"); + return None; + }, + None => panic!("with_shard: closure called twice; this is a bug"), + Some(f) => f, + }; + Some(f(&*shard)) + } + + fn new_shard_for(&mut self, thread: Thread) -> &mut Self { + self.0.insert(thread, Shard::new()); + self + } +} + +impl Shard { + fn new() -> Self { + Self { + spans: ReentrantMutex::new(UnsafeCell::new(HashMap::new())) + } + } + + fn insert(&self, id: Id, span: T) -> Option { + let guard = self.spans.lock(); + let spans = unsafe { + // this is safe as the mutex ensures that the map is not mutated + // concurrently, and the mutable ref will not leave this function. + &mut *(guard.get()) + }; + spans.insert(id, RefCell::new(span)) + } + + // fn spans_mut<'a>(&'a self) -> MappedReentrantMutexGuard<'a, RefMut<'a, Spans>> { + // let guard = self.spans.lock(); + // ReentrantMutexGuard::map(guard, RefCell::borrow_mut) + // } + fn span<'a>(&'a self, id: &Id) -> Option>> { + let guard = self.spans.lock(); + ReentrantMutexGuard::try_map( + guard, + move |spans| { + let spans = unsafe { &mut *(guard.get()) }; + spans.get_mut(id) + } + ).ok() + } + + fn try_steal(&self, id: &Id) -> Option> { + let guard = self.spans.lock(); + let spans = unsafe { + // this is safe as the mutex ensures that the map is not mutated + // concurrently, and the mutable ref will not leave this function. + &mut *(guard.get()) + }; + spans.insert(id.clone(), Slot::Stolen(Thread::current())) + } +} + +impl Thread { + fn current() -> Self { + static NEXT: AtomicUsize = AtomicUsize::new(0); + thread_local! { + static MY_ID: Cell> = Cell::new(None); + } + MY_ID.with(|my_id| if let Some(id) = my_id.get() { + Thread { + id + } + } else { + let id = NEXT.fetch_add(1, Ordering::SeqCst); + my_id.set(Some(id)); + Thread { + id + } + }) + } +} + +impl Slot { + + fn as_ref<'a>(&'a self) -> Option> { + match self { + Slot::Present(ref span) => Some(span.borrow()), + _ => None, + } + } + + fn as_mut<'a>(&'a self) -> Option> { + match self { + Slot::Present(ref span) => Some(span.borrow_mut()), + _ => None, + } + } + + fn into_option(self) -> Option { + match self { + Slot::Present(span) => Some(span.into_inner()), + _ => None, + } + } +} + + +impl From> for Error { + fn from(_: PoisonError) -> Self { + Error::Poisoned + } +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + Error::Poisoned => + "registry poisoned: another thread panicked inside".fmt(f), + Error::BadInsert => + "new thread was inserted but did not exist; this is a bug".fmt(f), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn basically_works() { + let registry: Registry = Registry::new(); + registry + .insert(Id::from_u64(1), 1) + .insert(Id::from_u64(2), 2); + + assert_eq!(registry.span(&Id::from_u64(1)), Some(1)); + assert_eq!(registry.span(&Id::from_u64(2)), Some(2)); + + registry.insert(Id::from_u64(3), 3); + + assert_eq!(registry.span(&Id::from_u64(1)), Some(1)); + assert_eq!(registry.span(&Id::from_u64(2)), Some(2)); + assert_eq!(registry.span(&Id::from_u64(3)), Some(3)); + } +}