appender: impl MakeWriter for RollingFileAppender (#1760)

## Motivation

Currently, `tracing-appender`'s `RollingFileAppender` does not implement
the `MakeWriter` trait. This means it can only be used by either
wrapping it in `NonBlocking`, or by wrapping it in a `Mutex`. However,
this shouldn't be strictly necessary, as `&File` implements `io::Write`.
It should thus only be necessary to introduce locking when we are in the
process of _rotating_ the log file.

## Solution

This branch adds a `MakeWriter` implementation for
`RollingFileAppender`. This is done by moving the file itself inside of
an `RwLock`, so that a read lock is acquired to write to the file. This
allows multiple threads to write to the file without contention. When
the file needs to be rolled, the rolling thread acquires the write lock
to replace the file. Acquiring the write lock is guarded by an atomic
CAS on the timestamp, so that only a single thread will try to roll the
file. This prevents other threads from immediately rolling the file
_again_ when the write lock is released.

I...should probably write tests for that, though.

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
This commit is contained in:
Eliza Weisman 2021-12-17 14:50:51 -08:00
parent f8d8ea3a09
commit fc52d45a0f
6 changed files with 286 additions and 126 deletions

View File

@ -0,0 +1,32 @@
//! This example demonstrates the use of multiple files with
//! `tracing-appender`'s `RollingFileAppender`
//!
use tracing_appender::rolling;
use tracing_subscriber::fmt::writer::MakeWriterExt;
#[path = "fmt/yak_shave.rs"]
mod yak_shave;
fn main() {
// Log all `tracing` events to files prefixed with `debug`. Since these
// files will be written to very frequently, roll the log file every minute.
let debug_file = rolling::minutely("./logs", "debug");
// Log warnings and errors to a separate file. Since we expect these events
// to occur less frequently, roll that file on a daily basis instead.
let warn_file = rolling::daily("./logs", "warnings").with_max_level(tracing::Level::WARN);
let all_files = debug_file.and(warn_file);
tracing_subscriber::fmt()
.with_writer(all_files)
.with_ansi(false)
.with_max_level(tracing::Level::TRACE)
.init();
yak_shave::shave_all(6);
tracing::info!("sleeping for a minute...");
std::thread::sleep(std::time::Duration::from_secs(60));
tracing::info!("okay, time to shave some more yaks!");
yak_shave::shave_all(10);
}

View File

@ -23,6 +23,7 @@ rust-version = "1.51.0"
[dependencies]
crossbeam-channel = "0.5.0"
time = { version = "0.3", default-features = false, features = ["formatting"] }
parking_lot = { optional = true, version = "0.11.2" }
[dependencies.tracing-subscriber]
path = "../tracing-subscriber"

View File

@ -1,105 +0,0 @@
use std::io::{BufWriter, Write};
use std::{fs, io};
use crate::rolling::Rotation;
use std::fmt::Debug;
use std::fs::{File, OpenOptions};
use std::path::Path;
use time::OffsetDateTime;
#[derive(Debug)]
pub(crate) struct InnerAppender {
log_directory: String,
log_filename_prefix: String,
writer: BufWriter<File>,
next_date: Option<OffsetDateTime>,
rotation: Rotation,
}
impl io::Write for InnerAppender {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let now = OffsetDateTime::now_utc();
self.write_timestamped(buf, now)
}
fn flush(&mut self) -> io::Result<()> {
self.writer.flush()
}
}
impl InnerAppender {
pub(crate) fn new(
log_directory: &Path,
log_filename_prefix: &Path,
rotation: Rotation,
now: OffsetDateTime,
) -> io::Result<Self> {
let log_directory = log_directory.to_str().unwrap();
let log_filename_prefix = log_filename_prefix.to_str().unwrap();
let filename = rotation.join_date(log_filename_prefix, &now);
let next_date = rotation.next_date(&now);
Ok(InnerAppender {
log_directory: log_directory.to_string(),
log_filename_prefix: log_filename_prefix.to_string(),
writer: create_writer(log_directory, &filename)?,
next_date,
rotation,
})
}
fn write_timestamped(&mut self, buf: &[u8], date: OffsetDateTime) -> io::Result<usize> {
// Even if refresh_writer fails, we still have the original writer. Ignore errors
// and proceed with the write.
let buf_len = buf.len();
self.refresh_writer(date);
self.writer.write_all(buf).map(|_| buf_len)
}
fn refresh_writer(&mut self, now: OffsetDateTime) {
if self.should_rollover(now) {
let filename = self.rotation.join_date(&self.log_filename_prefix, &now);
self.next_date = self.rotation.next_date(&now);
match create_writer(&self.log_directory, &filename) {
Ok(writer) => {
if let Err(err) = self.writer.flush() {
eprintln!("Couldn't flush previous writer: {}", err);
}
self.writer = writer
}
Err(err) => eprintln!("Couldn't create writer for logs: {}", err),
}
}
}
fn should_rollover(&self, date: OffsetDateTime) -> bool {
// the `None` case means that the `InnerAppender` *never* rorates log files.
match self.next_date {
None => false,
Some(next_date) => date >= next_date,
}
}
}
fn create_writer(directory: &str, filename: &str) -> io::Result<BufWriter<File>> {
let file_path = Path::new(directory).join(filename);
Ok(BufWriter::new(open_file_create_parent_dirs(&file_path)?))
}
fn open_file_create_parent_dirs(path: &Path) -> io::Result<File> {
let mut open_options = OpenOptions::new();
open_options.append(true).create(true);
let new_file = open_options.open(path);
if new_file.is_err() {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)?;
return open_options.open(path);
}
}
new_file
}

View File

@ -154,14 +154,14 @@ use crate::non_blocking::{NonBlocking, WorkerGuard};
use std::io::Write;
mod inner;
pub mod non_blocking;
pub mod rolling;
mod worker;
pub(crate) mod sync;
/// Convenience function for creating a non-blocking, off-thread writer.
///
/// See the [`non_blocking` module's docs][non_blocking]'s for more details.

View File

@ -30,32 +30,85 @@
//! let file_appender = RollingFileAppender::new(Rotation::HOURLY, "/some/directory", "prefix.log");
//! # }
//! ```
use crate::inner::InnerAppender;
use std::io;
use std::path::Path;
use crate::sync::{RwLock, RwLockReadGuard};
use std::{
fmt::Debug,
fs::{self, File, OpenOptions},
io::{self, Write},
path::Path,
sync::atomic::{AtomicUsize, Ordering},
};
use time::{format_description, Duration, OffsetDateTime, Time};
/// A file appender with the ability to rotate log files at a fixed schedule.
///
/// `RollingFileAppender` implements [`std:io::Write` trait][write] and will block on write operations.
/// It may be used with [`NonBlocking`][non-blocking] to perform writes without
/// blocking the current thread.
/// `RollingFileAppender` implements the [`std:io::Write` trait][write] and will
/// block on write operations. It may be used with [`NonBlocking`] to perform
/// writes without blocking the current thread.
///
/// [write]: https://doc.rust-lang.org/nightly/std/io/trait.Write.html
/// [non-blocking]: ../non_blocking/struct.NonBlocking.html
/// Additionally, `RollingFileAppender` also implements the [`MakeWriter`
/// trait][make_writer] from `tracing-appender`, so it may also be used
/// directly, without [`NonBlocking`].
///
/// [write]: std::io::Write
/// [`NonBlocking`]: super::non_blocking::NonBlocking
///
/// # Examples
///
/// Rolling a log file once every hour:
///
/// ```rust
/// # fn docs() {
/// let file_appender = tracing_appender::rolling::hourly("/some/directory", "prefix.log");
/// let file_appender = tracing_appender::rolling::hourly("/some/directory", "prefix");
/// # }
/// ```
///
/// Combining a `RollingFileAppender` with another [`MakeWriter`] implementation:
///
/// ```rust
/// # fn docs() {
/// use tracing_subscriber::fmt::writer::MakeWriterExt;
///
/// // Log all events to a rolling log file.
/// let logfile = tracing_appender::rolling::hourly("/logs", "myapp-logs");
/// // Log `INFO` and above to stdout.
/// let stdout = std::io::stdout.with_max_level(tracing::Level::INFO);
///
/// tracing_subscriber::fmt()
/// // Combine the stdout and log file `MakeWriter`s into one
/// // `MakeWriter` that writes to both
/// .with_writer(stdout.and(logfile))
/// .init();
/// # }
/// ```
///
/// [make_writer] tracing_subscriber::fmt::writer::MakeWriter
#[derive(Debug)]
pub struct RollingFileAppender {
inner: InnerAppender,
state: Inner,
writer: RwLock<File>,
}
/// A [writer] that writes to a rolling log file.
///
/// This is returned by the [`MakeWriter`] implementation for [`RollingFileAppender`].
///
/// [writer]: std::io::Write
/// [`MakeWriter`]: tracing_subscriber::fmt::writer::MakeWriter
#[derive(Debug)]
pub struct RollingWriter<'a>(RwLockReadGuard<'a, File>);
#[derive(Debug)]
struct Inner {
log_directory: String,
log_filename_prefix: String,
rotation: Rotation,
next_date: AtomicUsize,
}
// === impl RollingFileAppender ===
impl RollingFileAppender {
/// Creates a new `RollingFileAppender`.
///
@ -89,25 +142,62 @@ impl RollingFileAppender {
directory: impl AsRef<Path>,
file_name_prefix: impl AsRef<Path>,
) -> RollingFileAppender {
RollingFileAppender {
inner: InnerAppender::new(
directory.as_ref(),
file_name_prefix.as_ref(),
let now = OffsetDateTime::now_utc();
let log_directory = directory.as_ref().to_str().unwrap();
let log_filename_prefix = file_name_prefix.as_ref().to_str().unwrap();
let filename = rotation.join_date(log_filename_prefix, &now);
let next_date = rotation.next_date(&now);
let writer = RwLock::new(
create_writer(log_directory, &filename).expect("failed to create appender"),
);
Self {
state: Inner {
log_directory: log_directory.to_string(),
log_filename_prefix: log_filename_prefix.to_string(),
next_date: AtomicUsize::new(
next_date
.map(|date| date.unix_timestamp() as usize)
.unwrap_or(0),
),
rotation,
OffsetDateTime::now_utc(),
)
.expect("Failed to create appender"),
},
writer,
}
}
}
impl io::Write for RollingFileAppender {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.inner.write(buf)
let now = OffsetDateTime::now_utc();
let writer = self.writer.get_mut();
if self.state.should_rollover(now) {
let _did_cas = self.state.advance_date(now);
debug_assert!(_did_cas, "if we have &mut access to the appender, no other thread can have advanced the timestamp...");
self.state.refresh_writer(now, writer);
}
writer.write(buf)
}
fn flush(&mut self) -> io::Result<()> {
self.inner.flush()
self.writer.get_mut().flush()
}
}
impl<'a> tracing_subscriber::fmt::writer::MakeWriter<'a> for RollingFileAppender {
type Writer = RollingWriter<'a>;
fn make_writer(&'a self) -> Self::Writer {
let now = OffsetDateTime::now_utc();
// Should we try to roll over the log file?
if self.state.should_rollover(now) {
// Did we get the right to lock the file? If not, another thread
// did it and we can just make a writer.
if self.state.advance_date(now) {
self.state.refresh_writer(now, &mut *self.writer.write());
}
}
RollingWriter(self.writer.read())
}
}
@ -372,6 +462,79 @@ impl Rotation {
}
}
// === impl RollingWriter ===
impl io::Write for RollingWriter<'_> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
(&*self.0).write(buf)
}
fn flush(&mut self) -> io::Result<()> {
(&*self.0).flush()
}
}
// === impl Inner ===
impl Inner {
fn refresh_writer(&self, now: OffsetDateTime, file: &mut File) {
debug_assert!(self.should_rollover(now));
let filename = self.rotation.join_date(&self.log_filename_prefix, &now);
match create_writer(&self.log_directory, &filename) {
Ok(new_file) => {
if let Err(err) = file.flush() {
eprintln!("Couldn't flush previous writer: {}", err);
}
*file = new_file;
}
Err(err) => eprintln!("Couldn't create writer for logs: {}", err),
}
}
fn should_rollover(&self, date: OffsetDateTime) -> bool {
// the `None` case means that the `InnerAppender` *never* rotates log files.
let next_date = self.next_date.load(Ordering::Acquire);
if next_date == 0 {
return false;
}
date.unix_timestamp() as usize >= next_date
}
fn advance_date(&self, now: OffsetDateTime) -> bool {
let next_date = self
.rotation
.next_date(&now)
.map(|date| date.unix_timestamp() as usize)
.unwrap_or(0);
self.next_date
.compare_exchange(
now.unix_timestamp() as usize,
next_date,
Ordering::AcqRel,
Ordering::Acquire,
)
.is_ok()
}
}
fn create_writer(directory: &str, filename: &str) -> io::Result<File> {
let path = Path::new(directory).join(filename);
let mut open_options = OpenOptions::new();
open_options.append(true).create(true);
let new_file = open_options.open(path.as_path());
if new_file.is_err() {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)?;
return open_options.open(path);
}
}
new_file
}
#[cfg(test)]
mod test {
use super::*;

View File

@ -0,0 +1,69 @@
//! Abstracts over sync primitive implementations.
//!
//! Optionally, we allow the Rust standard library's `RwLock` to be replaced
//! with the `parking_lot` crate's implementation. This may provide improved
//! performance in some cases. However, the `parking_lot` dependency is an
//! opt-in feature flag. Because `parking_lot::RwLock` has a slightly different
//! API than `std::sync::RwLock` (it does not support poisoning on panics), we
//! wrap the `std::sync` version to ignore poisoning.
#[allow(unused_imports)] // may be used later;
#[cfg(feature = "parking_lot")]
pub(crate) use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
#[cfg(not(feature = "parking_lot"))]
pub(crate) use self::std_impl::*;
#[cfg(not(feature = "parking_lot"))]
mod std_impl {
use std::sync::{self, PoisonError, TryLockError};
pub(crate) use std::sync::{RwLockReadGuard, RwLockWriteGuard};
#[derive(Debug)]
pub(crate) struct RwLock<T> {
inner: sync::RwLock<T>,
}
impl<T> RwLock<T> {
pub(crate) fn new(val: T) -> Self {
Self {
inner: sync::RwLock::new(val),
}
}
#[inline]
pub(crate) fn get_mut(&mut self) -> &mut T {
self.inner.get_mut().unwrap_or_else(PoisonError::into_inner)
}
#[inline]
pub(crate) fn read(&self) -> RwLockReadGuard<'_, T> {
self.inner.read().unwrap_or_else(PoisonError::into_inner)
}
#[inline]
#[allow(dead_code)] // may be used later;
pub(crate) fn try_read(&self) -> Option<RwLockReadGuard<'_, T>> {
match self.inner.try_read() {
Ok(guard) => Some(guard),
Err(TryLockError::Poisoned(e)) => Some(e.into_inner()),
Err(TryLockError::WouldBlock) => None,
}
}
#[inline]
pub(crate) fn write(&self) -> RwLockWriteGuard<'_, T> {
self.inner.write().unwrap_or_else(PoisonError::into_inner)
}
#[inline]
#[allow(dead_code)] // may be used later;
pub(crate) fn try_write(&self) -> Option<RwLockWriteGuard<'_, T>> {
match self.inner.try_write() {
Ok(guard) => Some(guard),
Err(TryLockError::Poisoned(e)) => Some(e.into_inner()),
Err(TryLockError::WouldBlock) => None,
}
}
}
}