mirror of
https://github.com/tokio-rs/tracing.git
synced 2025-10-02 15:24:47 +00:00
appender: fix race condition when logging on shutdown (#1125)
## Motivation Fixes the race condition outlined in #1120 . ## Solution `Worker` now uses a 2 stage shutdown approach. The first shutdown signal is sent through the main message channel to the `Worker` from `WorkerGuard` when it is dropped. Then `WorkerGuard` sends a second signal on a second channel that is zero-capacity. This means It will only succeed a `send()` when a `recv()` is called on the other end. This guarantees that the `Worker` has flushed all it's messages before the `WorkerGuard` can continue with its drop. With this solution I'm not able to reproduce the race anymore using the provided code sample from #1120 Co-authored-by: Zeki Sherif <zekshi@amazon.com>
This commit is contained in:
parent
91119f832a
commit
9cb829f5e5
@ -106,6 +106,7 @@ pub const DEFAULT_BUFFERED_LINES_LIMIT: usize = 128_000;
|
|||||||
pub struct WorkerGuard {
|
pub struct WorkerGuard {
|
||||||
guard: Option<JoinHandle<()>>,
|
guard: Option<JoinHandle<()>>,
|
||||||
sender: Sender<Msg>,
|
sender: Sender<Msg>,
|
||||||
|
shutdown: Sender<()>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A non-blocking writer.
|
/// A non-blocking writer.
|
||||||
@ -148,8 +149,11 @@ impl NonBlocking {
|
|||||||
) -> (NonBlocking, WorkerGuard) {
|
) -> (NonBlocking, WorkerGuard) {
|
||||||
let (sender, receiver) = bounded(buffered_lines_limit);
|
let (sender, receiver) = bounded(buffered_lines_limit);
|
||||||
|
|
||||||
let worker = Worker::new(receiver, writer);
|
let (shutdown_sender, shutdown_receiver) = bounded(0);
|
||||||
let worker_guard = WorkerGuard::new(worker.worker_thread(), sender.clone());
|
|
||||||
|
let worker = Worker::new(receiver, writer, shutdown_receiver);
|
||||||
|
let worker_guard =
|
||||||
|
WorkerGuard::new(worker.worker_thread(), sender.clone(), shutdown_sender);
|
||||||
|
|
||||||
(
|
(
|
||||||
Self {
|
Self {
|
||||||
@ -245,10 +249,11 @@ impl MakeWriter for NonBlocking {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl WorkerGuard {
|
impl WorkerGuard {
|
||||||
fn new(handle: JoinHandle<()>, sender: Sender<Msg>) -> Self {
|
fn new(handle: JoinHandle<()>, sender: Sender<Msg>, shutdown: Sender<()>) -> Self {
|
||||||
WorkerGuard {
|
WorkerGuard {
|
||||||
guard: Some(handle),
|
guard: Some(handle),
|
||||||
sender,
|
sender,
|
||||||
|
shutdown,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -259,7 +264,14 @@ impl Drop for WorkerGuard {
|
|||||||
.sender
|
.sender
|
||||||
.send_timeout(Msg::Shutdown, Duration::from_millis(100))
|
.send_timeout(Msg::Shutdown, Duration::from_millis(100))
|
||||||
{
|
{
|
||||||
Ok(_) | Err(SendTimeoutError::Disconnected(_)) => (),
|
Ok(_) => {
|
||||||
|
// Attempt to wait for `Worker` to flush all messages before dropping. This happens
|
||||||
|
// when the `Worker` calls `recv()` on a zero-capacity channel. Use `send_timeout`
|
||||||
|
// so that drop is not blocked indefinitely.
|
||||||
|
// TODO: Make timeout configurable.
|
||||||
|
let _ = self.shutdown.send_timeout((), Duration::from_millis(1000));
|
||||||
|
}
|
||||||
|
Err(SendTimeoutError::Disconnected(_)) => (),
|
||||||
Err(SendTimeoutError::Timeout(e)) => println!(
|
Err(SendTimeoutError::Timeout(e)) => println!(
|
||||||
"Failed to send shutdown signal to logging worker. Error: {:?}",
|
"Failed to send shutdown signal to logging worker. Error: {:?}",
|
||||||
e
|
e
|
||||||
|
@ -7,6 +7,7 @@ use std::{io, thread};
|
|||||||
pub(crate) struct Worker<T: Write + Send + Sync + 'static> {
|
pub(crate) struct Worker<T: Write + Send + Sync + 'static> {
|
||||||
writer: T,
|
writer: T,
|
||||||
receiver: Receiver<Msg>,
|
receiver: Receiver<Msg>,
|
||||||
|
shutdown: Receiver<()>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
|
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
|
||||||
@ -18,8 +19,12 @@ pub(crate) enum WorkerState {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<T: Write + Send + Sync + 'static> Worker<T> {
|
impl<T: Write + Send + Sync + 'static> Worker<T> {
|
||||||
pub(crate) fn new(receiver: Receiver<Msg>, writer: T) -> Worker<T> {
|
pub(crate) fn new(receiver: Receiver<Msg>, writer: T, shutdown: Receiver<()>) -> Worker<T> {
|
||||||
Self { writer, receiver }
|
Self {
|
||||||
|
writer,
|
||||||
|
receiver,
|
||||||
|
shutdown,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_recv(&mut self, result: &Result<Msg, RecvError>) -> io::Result<WorkerState> {
|
fn handle_recv(&mut self, result: &Result<Msg, RecvError>) -> io::Result<WorkerState> {
|
||||||
@ -67,7 +72,10 @@ impl<T: Write + Send + Sync + 'static> Worker<T> {
|
|||||||
loop {
|
loop {
|
||||||
match self.work() {
|
match self.work() {
|
||||||
Ok(WorkerState::Continue) | Ok(WorkerState::Empty) => {}
|
Ok(WorkerState::Continue) | Ok(WorkerState::Empty) => {}
|
||||||
Ok(WorkerState::Shutdown) | Ok(WorkerState::Disconnected) => break,
|
Ok(WorkerState::Shutdown) | Ok(WorkerState::Disconnected) => {
|
||||||
|
let _ = self.shutdown.recv();
|
||||||
|
break;
|
||||||
|
}
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
// TODO: Expose a metric for IO Errors, or print to stderr
|
// TODO: Expose a metric for IO Errors, or print to stderr
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user