diff --git a/src/transforms/detect_exceptions/exception_detector.rs b/src/transforms/detect_exceptions/exception_detector.rs index d0f450865d650..f838619650521 100644 --- a/src/transforms/detect_exceptions/exception_detector.rs +++ b/src/transforms/detect_exceptions/exception_detector.rs @@ -6,6 +6,7 @@ use crate::{ use chrono::{DateTime, Utc}; use regex::Regex; use std::collections::HashMap; +use std::sync::Arc; use vector_lib::lookup::path::OwnedTargetPath; #[derive(Debug, Clone)] @@ -13,7 +14,7 @@ pub struct RuleTarget { regex: Regex, to_state: ExceptionState, } -type StateMachine = HashMap>; +pub type StateMachine = HashMap>; use rules::*; @@ -63,7 +64,7 @@ pub struct TraceAccumulator { impl TraceAccumulator { pub fn new( - languages: Vec, + state_machine: Arc, multiline_flush_interval: Duration, max_bytes: usize, max_lines: usize, @@ -79,7 +80,7 @@ impl TraceAccumulator { buffer_start_time: Utc::now(), accumulated_messages: vec![], detector: ExceptionDetector { - state_machine: get_state_machines(languages), + state_machine, current_state: ExceptionState::StartState, }, } @@ -195,7 +196,7 @@ impl TraceAccumulator { pub struct ExceptionDetectorConfig {} pub struct ExceptionDetector { - pub state_machine: StateMachine, + pub state_machine: Arc, pub current_state: ExceptionState, } @@ -259,7 +260,7 @@ mod exception_detector_tests { fn check_exception(line: &str, detects_end: bool) { let lines = split(line); let mut detector = ExceptionDetector { - state_machine: get_state_machines(default_programming_languages()), + state_machine: Arc::new(get_state_machines(default_programming_languages())), current_state: ExceptionState::StartState, }; let after_exc = if detects_end { EndTrace } else { InsideTrace }; diff --git a/src/transforms/detect_exceptions/mod.rs b/src/transforms/detect_exceptions/mod.rs index 108d3f3094292..ede8aef004de2 100644 --- a/src/transforms/detect_exceptions/mod.rs +++ b/src/transforms/detect_exceptions/mod.rs @@ -16,7 +16,7 @@ use crate::{ }; use async_stream::stream; use futures::{stream, Stream, StreamExt}; -use std::{collections::HashMap, pin::Pin, time::Duration}; +use std::{collections::HashMap, pin::Pin, sync::Arc, time::Duration}; use vector_lib::{ config::{clone_input_definitions, log_schema, LogNamespace}, configurable::configurable_component, @@ -192,7 +192,7 @@ impl TransformConfig for DetectExceptionsConfig { pub struct DetectExceptions { accumulators: HashMap, - languages: Vec, + state_machine: Arc, expire_after: Duration, flush_period: Duration, multiline_flush_interval: Duration, @@ -214,9 +214,14 @@ impl DetectExceptions { Ok(value) => value, }; + // Create the state machine once and share it across all accumulators + let state_machine = Arc::new(exception_detector::get_state_machines( + config.languages.clone(), + )); + Ok(DetectExceptions { accumulators: HashMap::new(), - languages: config.languages.clone(), + state_machine, group_by: config.group_by.clone(), expire_after: config.expire_after_ms, multiline_flush_interval: config.multiline_flush_interval_ms, @@ -235,7 +240,7 @@ impl DetectExceptions { self.accumulators.insert( discriminant.clone(), TraceAccumulator::new( - self.languages.clone(), + Arc::clone(&self.state_machine), self.multiline_flush_interval, self.max_bytes, self.max_lines,