Skip to content

Commit 7bd9b75

Browse files
committed
DPL: add metric aggregation based on policies
1 parent e73e5b9 commit 7bd9b75

6 files changed

Lines changed: 732 additions & 0 deletions

File tree

Framework/Core/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,8 @@ o2_add_library(Framework
119119
src/PluginManager.cxx
120120
src/RateLimiter.cxx
121121
src/ResourcesMonitoringHelper.cxx
122+
src/AggregationPolicy.cxx
123+
src/MetricAggregator.cxx
122124
src/ResourcePolicy.cxx
123125
src/ResourcePolicyHelpers.cxx
124126
src/RootArrowFilesystem.cxx
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
// Copyright 2019-2025 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
12+
#ifndef O2_FRAMEWORK_METRICAGGREGATOR_AGGREGATIONPOLICY_H
13+
#define O2_FRAMEWORK_METRICAGGREGATOR_AGGREGATIONPOLICY_H
14+
15+
#include <string>
16+
#include <string_view>
17+
#include <vector>
18+
#include <regex>
19+
20+
namespace o2
21+
{
22+
23+
namespace framework
24+
{
25+
26+
namespace metricaggregator
27+
{
28+
/// Defines the selection strategy for devices.
29+
enum class AggregationSelectionType {
30+
All,
31+
Specific
32+
};
33+
/// Defines the reduction strategy for metrics.
34+
enum class AggregationMetricType {
35+
Sum,
36+
Average,
37+
Rate,
38+
Specific,
39+
Simple
40+
};
41+
42+
/// Parses environment configurations and evaluates aggregation rules.
43+
class AggregationPolicy
44+
{
45+
public:
46+
AggregationPolicy() = default;
47+
~AggregationPolicy() = default;
48+
/// Reads configuration from environment variables and sets internal rules.
49+
void configureFromEnv();
50+
/// Returns the configured device selection type.
51+
AggregationSelectionType getSelection() const noexcept;
52+
/// Returns the configured global metric reduction type.
53+
AggregationMetricType getReduction() const noexcept;
54+
/// Determines the specific reduction type required for a given metric name.
55+
AggregationMetricType getAggregationTypeForMetric(std::string_view metricName) const;
56+
/// Evaluates whether the policy allows processing for the provided device name.
57+
bool selectDevice(std::string_view deviceId) const;
58+
59+
private:
60+
/// Maps a regular expression pattern to a specific aggregation type.
61+
struct MetricRule {
62+
std::regex metricPattern;
63+
AggregationMetricType type;
64+
};
65+
66+
std::vector<std::string> split(std::string_view input, char delim) const;
67+
/// Converts a string literal into an AggregationSelectionType enum.
68+
AggregationSelectionType parseSelectionType(const std::string& str);
69+
/// Converts a string literal into an AggregationMetricType enum.
70+
AggregationMetricType parseReductionType(const std::string& str);
71+
72+
AggregationSelectionType mSelection = AggregationSelectionType::All;
73+
AggregationMetricType mReduction = AggregationMetricType::Sum;
74+
std::vector<std::string> mSpecificDevices;
75+
std::vector<MetricRule> mSpecificMetricRules;
76+
};
77+
78+
} // namespace metricaggregator
79+
} // namespace framework
80+
} // namespace o2
81+
82+
#endif // O2_FRAMEWORK_METRICAGGREGATOR_AGGREGATIONPOLICY_H
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
// Copyright 2019-2025 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
12+
#ifndef O2_FRAMEWORK_METRICAGGREGATOR_METRICAGGREGATOR_H
13+
#define O2_FRAMEWORK_METRICAGGREGATOR_METRICAGGREGATOR_H
14+
15+
#include "Framework/ServiceHandle.h"
16+
#include "Framework/ServiceMetricsInfo.h"
17+
#include "Framework/Monitoring.h"
18+
19+
#include <fairmq/ProgOptions.h>
20+
#include <memory>
21+
#include <string>
22+
#include <vector>
23+
#include <unordered_map>
24+
25+
#include "Framework/AggregationPolicy.h"
26+
27+
namespace o2
28+
{
29+
30+
namespace framework
31+
{
32+
33+
namespace metricaggregator
34+
{
35+
/// Stores a single numeric measurement and its associated timestamp.
36+
struct MetricSample {
37+
double value = 0.0;
38+
std::size_t timestamp = 0;
39+
};
40+
41+
/// Collects and reduces metrics across multiple framework devices.
42+
/// Transmits the aggregated results to an external monitoring backend.
43+
class MetricAggregator
44+
{
45+
public:
46+
explicit MetricAggregator();
47+
~MetricAggregator() = default;
48+
/// Initializes the internal aggregation policy from environment variables.
49+
void setPolicy();
50+
/// Returns the current policy configuration as a formatted string.
51+
std::string getPolicy();
52+
/// Routes metrics to the appropriate processing function based on the policy reduction type.
53+
void mergeMetrics(const std::vector<DeviceMetricsInfo>& metrics,
54+
const DeviceMetricsInfo& driverMetrics,
55+
const std::vector<DeviceSpec>& specs);
56+
57+
private:
58+
/// Appends a suffix to the metric name based on the applied aggregation type.
59+
std::string getMetricNameFromPolicy(std::string_view metricName, AggregationMetricType aggregationType);
60+
/// Flushes metrics directly without aggregation.
61+
void flushMetricsSimple(const std::vector<DeviceMetricsInfo>& deviceMetrics,
62+
const DeviceMetricsInfo& driverMetrics,
63+
const std::vector<DeviceSpec>& specs);
64+
/// Flushes metrics by applying the aggregation policy.
65+
void flushMetrics(const std::vector<DeviceMetricsInfo>& deviceMetrics,
66+
const DeviceMetricsInfo& driverMetrics,
67+
const std::vector<DeviceSpec>& specs);
68+
/// Retrieves the monitoring backend type from environment variables.
69+
const char* getBackendFromEnv();
70+
71+
const char* mBackend = nullptr;
72+
std::unique_ptr<o2::monitoring::Monitoring> mMonitoring;
73+
/// Stores the previous samples required to compute rates over time.
74+
std::unordered_map<std::string, std::vector<MetricSample>> mLastSentSamples;
75+
std::unique_ptr<AggregationPolicy> mPolicy;
76+
};
77+
} // namespace metricaggregator
78+
} // namespace framework
79+
} // namespace o2
80+
81+
#endif // O2_FRAMEWORK_METRICAGGREGATOR_METRICAGGREGATOR_H
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
// Copyright 2019-2025 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
12+
#include "Framework/AggregationPolicy.h"
13+
#include "Framework/Logger.h"
14+
15+
#include <algorithm>
16+
#include <cstdlib>
17+
#include <sstream>
18+
#include <string>
19+
#include <vector>
20+
#include <regex>
21+
#include <stdexcept>
22+
23+
using namespace o2::framework::metricaggregator;
24+
25+
std::vector<std::string> AggregationPolicy::split(std::string_view input, char delim) const
26+
{
27+
std::vector<std::string> tokens;
28+
std::string token;
29+
std::istringstream tokenStream{std::string(input)};
30+
while (std::getline(tokenStream, token, delim)) {
31+
tokens.push_back(token);
32+
}
33+
return tokens;
34+
}
35+
36+
void AggregationPolicy::configureFromEnv()
37+
{
38+
const char* envPolicy = std::getenv("ALIEN_JDL_AGGREGATOR_POLICY");
39+
if (!envPolicy) {
40+
LOGP(warn, "[AggregationPolicy] ALIEN_JDL_AGGREGATOR_POLICY is not set. Using default 'all:simple'.");
41+
mSelection = AggregationSelectionType::All;
42+
mReduction = AggregationMetricType::Simple;
43+
return;
44+
}
45+
46+
try {
47+
std::string policyStr(envPolicy);
48+
std::vector<std::string> parts = split(policyStr, ':');
49+
50+
if (parts.size() < 2) {
51+
LOGP(error, "[AggregationPolicy] Invalid ALIEN_JDL_AGGREGATOR_POLICY format");
52+
return;
53+
}
54+
55+
mSelection = parseSelectionType(parts[0]);
56+
mReduction = parseReductionType(parts[1]);
57+
58+
if (mSelection == AggregationSelectionType::Specific) {
59+
const char* envDevices = std::getenv("ALIEN_JDL_AGGREGATOR_DEVICES");
60+
if (!envDevices) {
61+
throw std::invalid_argument("ALIEN_JDL_AGGREGATOR_DEVICES environment variable is required when selection type is 'specific'");
62+
}
63+
mSpecificDevices = split(std::string(envDevices), ',');
64+
}
65+
if (mReduction == AggregationMetricType::Specific) {
66+
const char* envMetrics = std::getenv("ALIEN_JDL_AGGREGATOR_METRICS");
67+
if (!envMetrics) {
68+
LOGP(warn, "[AggregationPolicy] ALIEN_JDL_AGGREGATOR_METRICS environment variable missing for 'specific' reduction type. Using default.");
69+
mSpecificMetricRules.push_back({std::regex(".*"), AggregationMetricType::Sum});
70+
return;
71+
}
72+
73+
std::stringstream metricsStream(envMetrics);
74+
std::string metricRuleStr;
75+
while (std::getline(metricsStream, metricRuleStr, ';')) {
76+
auto pos = metricRuleStr.find(':');
77+
if (pos == std::string::npos) {
78+
throw std::invalid_argument("Invalid metric rule format: " + metricRuleStr);
79+
}
80+
std::string typeStr = metricRuleStr.substr(0, pos);
81+
std::string patternStr = metricRuleStr.substr(pos + 1);
82+
AggregationMetricType type = parseReductionType(typeStr);
83+
mSpecificMetricRules.push_back({std::regex(patternStr), type});
84+
}
85+
}
86+
} catch (std::exception const& e) {
87+
LOGP(error, "[AggregationPolicy] Failed to parse ALIEN_JDL_AGGREGATOR_POLICY: {}", e.what());
88+
}
89+
}
90+
91+
AggregationMetricType AggregationPolicy::getAggregationTypeForMetric(std::string_view metricName) const
92+
{
93+
if (mReduction != AggregationMetricType::Specific) {
94+
return mReduction;
95+
}
96+
for (const auto& rule : mSpecificMetricRules) {
97+
if (std::regex_match(std::string(metricName), rule.metricPattern)) {
98+
return rule.type;
99+
}
100+
}
101+
if (mReduction == AggregationMetricType::Specific) {
102+
LOGP(error, "[AggregationPolicy] No specific aggregation type found for metric '{}'", metricName);
103+
}
104+
throw std::invalid_argument("No specific aggregation type found for metric: " + std::string(metricName));
105+
}
106+
107+
AggregationSelectionType AggregationPolicy::getSelection() const noexcept
108+
{
109+
return mSelection;
110+
}
111+
112+
AggregationMetricType AggregationPolicy::getReduction() const noexcept
113+
{
114+
return mReduction;
115+
}
116+
117+
AggregationSelectionType AggregationPolicy::parseSelectionType(const std::string& str)
118+
{
119+
if (str == "all") {
120+
return AggregationSelectionType::All;
121+
} else if (str == "specific") {
122+
return AggregationSelectionType::Specific;
123+
}
124+
throw std::invalid_argument("Invalid selection type: " + str);
125+
}
126+
127+
AggregationMetricType AggregationPolicy::parseReductionType(const std::string& str)
128+
{
129+
if (str == "sum") {
130+
return AggregationMetricType::Sum;
131+
} else if (str == "average") {
132+
return AggregationMetricType::Average;
133+
} else if (str == "rate") {
134+
return AggregationMetricType::Rate;
135+
} else if (str == "simple") {
136+
return AggregationMetricType::Simple;
137+
} else if (str == "specific") {
138+
return AggregationMetricType::Specific;
139+
}
140+
throw std::invalid_argument("Invalid reduction type: " + str);
141+
}
142+
143+
bool AggregationPolicy::selectDevice(std::string_view deviceId) const
144+
{
145+
if (mSelection == AggregationSelectionType::Specific) {
146+
return std::find(mSpecificDevices.begin(), mSpecificDevices.end(), deviceId) != mSpecificDevices.end();
147+
}
148+
return true;
149+
}

0 commit comments

Comments
 (0)