Skip to content

Commit c060a1e

Browse files
committed
CCDB: add extra metrics for amount of data requested / fetched
1 parent 3eec9b6 commit c060a1e

8 files changed

Lines changed: 63 additions & 7 deletions

File tree

CCDB/include/CCDB/BasicCCDBManager.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ class CCDBManagerInstance
5555
long endvalidity = -1;
5656
long cacheValidFrom = 0; // time for which the object was cached
5757
long cacheValidUntil = -1; // object is guaranteed to be valid till this time (modulo new updates)
58+
size_t size = 0;
5859
size_t minSize = -1ULL;
5960
size_t maxSize = 0;
6061
int queries = 0;
@@ -229,6 +230,7 @@ class CCDBManagerInstance
229230
long mCreatedNotBefore = 0; // lower limit for object creation timestamp (TimeMachine mode) - If-Not-Before HTTP header
230231
long mTimerMS = 0; // timer for queries
231232
size_t mFetchedSize = 0; // total fetched size
233+
size_t mRequestedSize = 0; // total requested size (fetched + served from cache)
232234
int mQueries = 0; // total number of object queries
233235
int mFetches = 0; // total number of succesful fetches from CCDB
234236
int mFailures = 0; // total number of failed fetches
@@ -258,6 +260,7 @@ T* CCDBManagerInstance::getForTimeStamp(std::string const& path, long timestamp,
258260
if (sh != mHeaders.end()) {
259261
size_t s = atol(sh->second.c_str());
260262
mFetchedSize += s;
263+
mRequestedSize += s;
261264
}
262265
}
263266

@@ -272,6 +275,7 @@ T* CCDBManagerInstance::getForTimeStamp(std::string const& path, long timestamp,
272275
if (headers) {
273276
*headers = cached.cacheOfHeaders;
274277
}
278+
mRequestedSize += cached.size;
275279
return reinterpret_cast<T*>(cached.noCleanupPtr ? cached.noCleanupPtr : cached.objPtr.get());
276280
}
277281
ptr = mCCDBAccessor.retrieveFromTFileAny<T>(path, mMetaData, timestamp, &mHeaders, cached.uuid,
@@ -318,6 +322,8 @@ T* CCDBManagerInstance::getForTimeStamp(std::string const& path, long timestamp,
318322
if (sh != mHeaders.end()) {
319323
size_t s = atol(sh->second.c_str());
320324
mFetchedSize += s;
325+
mRequestedSize += s;
326+
cached.size = s;
321327
cached.minSize = std::min(s, cached.minSize);
322328
cached.maxSize = std::max(s, cached.minSize);
323329
}
@@ -342,12 +348,14 @@ T* CCDBManagerInstance::getForTimeStamp(std::string const& path, long timestamp,
342348
}
343349
auto end = std::chrono::system_clock::now();
344350
mTimerMS += std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
345-
auto *ref = o2::framework::ServiceRegistryRef::globalDeviceRef();
351+
auto* ref = o2::framework::ServiceRegistryRef::globalDeviceRef();
346352
if (ref && ref->active<framework::DataProcessingStats>()) {
347353
auto& stats = ref->get<o2::framework::DataProcessingStats>();
348354
stats.updateStats({(int)o2::framework::ProcessingStatsId::CCDB_CACHE_HIT, o2::framework::DataProcessingStats::Op::Set, (int64_t)mQueries - mFailures - mFetches});
349355
stats.updateStats({(int)o2::framework::ProcessingStatsId::CCDB_CACHE_MISS, o2::framework::DataProcessingStats::Op::Set, (int64_t)mFetches});
350356
stats.updateStats({(int)o2::framework::ProcessingStatsId::CCDB_CACHE_FAILURE, o2::framework::DataProcessingStats::Op::Set, (int64_t)mFailures});
357+
stats.updateStats({(int)o2::framework::ProcessingStatsId::CCDB_CACHE_FETCHED_BYTES, o2::framework::DataProcessingStats::Op::Set, (int64_t)mFetchedSize});
358+
stats.updateStats({(int)o2::framework::ProcessingStatsId::CCDB_CACHE_REQUESTED_BYTES, o2::framework::DataProcessingStats::Op::Set, (int64_t)mRequestedSize});
351359
}
352360
return ptr;
353361
}

CCDB/src/BasicCCDBManager.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ std::pair<int64_t, int64_t> CCDBManagerInstance::getRunDuration(int runnumber, b
101101

102102
std::string CCDBManagerInstance::getSummaryString() const
103103
{
104-
std::string res = fmt::format("{} queries, {} bytes", mQueries, fmt::group_digits(mFetchedSize));
104+
std::string res = fmt::format("{} queries, {} fetched / {} requested bytes", mQueries, fmt::group_digits(mFetchedSize), fmt::group_digits(mRequestedSize));
105105
if (mCachingEnabled) {
106106
res += fmt::format(" for {} objects", mCache.size());
107107
}

Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
#include "AnalysisCCDBHelpers.h"
1313
#include "CCDBFetcherHelper.h"
14+
#include "Framework/DataProcessingStats.h"
1415
#include "Framework/DeviceSpec.h"
1516
#include "Framework/TimingInfo.h"
1617
#include "Framework/ConfigParamRegistry.h"
@@ -105,7 +106,7 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/)
105106
std::unordered_map<std::string, int> bindings;
106107
fillValidRoutes(*helper, spec.outputs, bindings);
107108

108-
return adaptStateless([schemas, bindings, helper](InputRecord& inputs, DataTakingContext& dtc, DataAllocator& allocator, TimingInfo& timingInfo) {
109+
return adaptStateless([schemas, bindings, helper](InputRecord& inputs, DataTakingContext& dtc, DataAllocator& allocator, TimingInfo& timingInfo, DataProcessingStats& stats) {
109110
O2_SIGNPOST_ID_GENERATE(sid, ccdb);
110111
O2_SIGNPOST_START(ccdb, sid, "fetchFromAnalysisCCDB", "Fetching CCDB objects for analysis%" PRIu64, (uint64_t)timingInfo.timeslice);
111112
for (auto& schema : schemas) {
@@ -182,6 +183,8 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/)
182183
allocator.adopt(Output{concrete.origin, concrete.description, concrete.subSpec}, outTable);
183184
}
184185

186+
stats.updateStats({(int)ProcessingStatsId::CCDB_CACHE_FETCHED_BYTES, DataProcessingStats::Op::Set, (int64_t)helper->totalFetchedBytes});
187+
stats.updateStats({(int)ProcessingStatsId::CCDB_CACHE_REQUESTED_BYTES, DataProcessingStats::Op::Set, (int64_t)helper->totalRequestedBytes});
185188
O2_SIGNPOST_END(ccdb, sid, "fetchFromAnalysisCCDB", "Fetching CCDB objects");
186189
});
187190
});

Framework/CCDBSupport/src/CCDBFetcherHelper.cxx

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,8 @@ auto CCDBFetcherHelper::populateCacheWith(std::shared_ptr<CCDBFetcherHelper> con
254254
helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize);
255255
helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize);
256256
auto size = v.size();
257+
helper->totalFetchedBytes += size;
258+
helper->totalRequestedBytes += size;
257259
api.appendFlatHeader(v, headers);
258260
auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB);
259261
helper->mapURL2DPLCache[path] = cacheId;
@@ -271,6 +273,8 @@ auto CCDBFetcherHelper::populateCacheWith(std::shared_ptr<CCDBFetcherHelper> con
271273
helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize);
272274
helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize);
273275
auto size = v.size();
276+
helper->totalFetchedBytes += size;
277+
helper->totalRequestedBytes += size;
274278
api.appendFlatHeader(v, headers);
275279
auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB);
276280
helper->mapURL2DPLCache[path] = cacheId;

Framework/CCDBSupport/src/CCDBFetcherHelper.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ struct CCDBFetcherHelper {
8484

8585
static ParserResult parseRemappings(char const*);
8686

87+
size_t totalFetchedBytes = 0;
88+
size_t totalRequestedBytes = 0;
8789
std::unordered_map<std::string, CCDBCacheInfo> mapURL2UUID;
8890
std::unordered_map<std::string, DataAllocator::CacheId> mapURL2DPLCache;
8991
std::string createdNotBefore = "0";

Framework/CCDBSupport/src/CCDBHelpers.cxx

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
#include "CCDBHelpers.h"
1313
#include "Framework/DeviceSpec.h"
14+
#include "Framework/DataProcessingStats.h"
1415
#include "Framework/Logger.h"
1516
#include "Framework/TimingInfo.h"
1617
#include "Framework/ConfigParamRegistry.h"
@@ -28,14 +29,16 @@ O2_DECLARE_DYNAMIC_LOG(ccdb);
2829
namespace o2::framework
2930
{
3031

31-
namespace {
32+
namespace
33+
{
3234
struct CCDBFetcherHelper {
3335
struct CCDBCacheInfo {
3436
std::string etag;
3537
size_t cacheValidUntil = 0;
3638
size_t cachePopulatedAt = 0;
3739
size_t cacheMiss = 0;
3840
size_t cacheHit = 0;
41+
size_t size = 0;
3942
size_t minSize = -1ULL;
4043
size_t maxSize = 0;
4144
int lastCheckedTF = 0;
@@ -50,6 +53,8 @@ struct CCDBFetcherHelper {
5053
std::string url;
5154
};
5255

56+
size_t totalFetchedBytes = 0;
57+
size_t totalRequestedBytes = 0;
5358
std::unordered_map<std::string, CCDBCacheInfo> mapURL2UUID;
5459
std::unordered_map<std::string, DataAllocator::CacheId> mapURL2DPLCache;
5560
std::string createdNotBefore = "0";
@@ -80,7 +85,7 @@ struct CCDBFetcherHelper {
8085
return apis[entry == remappings.end() ? "" : entry->second];
8186
}
8287
};
83-
}
88+
} // namespace
8489

8590
bool isPrefix(std::string_view prefix, std::string_view full)
8691
{
@@ -336,8 +341,11 @@ auto populateCacheWith(std::shared_ptr<CCDBFetcherHelper> const& helper,
336341
helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid
337342
helper->mapURL2UUID[path].cachePopulatedAt = timestampToUse;
338343
helper->mapURL2UUID[path].cacheMiss++;
344+
helper->mapURL2UUID[path].size = v.size();
339345
helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize);
340346
helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize);
347+
helper->totalFetchedBytes += v.size();
348+
helper->totalRequestedBytes += v.size();
341349
api.appendFlatHeader(v, headers);
342350
auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB);
343351
helper->mapURL2DPLCache[path] = cacheId;
@@ -350,8 +358,11 @@ auto populateCacheWith(std::shared_ptr<CCDBFetcherHelper> const& helper,
350358
helper->mapURL2UUID[path].cachePopulatedAt = timestampToUse;
351359
helper->mapURL2UUID[path].cacheValidUntil = headers["Cache-Valid-Until"].empty() ? 0 : std::stoul(headers["Cache-Valid-Until"]);
352360
helper->mapURL2UUID[path].cacheMiss++;
361+
helper->mapURL2UUID[path].size = v.size();
353362
helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize);
354363
helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize);
364+
helper->totalFetchedBytes += v.size();
365+
helper->totalRequestedBytes += v.size();
355366
api.appendFlatHeader(v, headers);
356367
auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB);
357368
helper->mapURL2DPLCache[path] = cacheId;
@@ -368,6 +379,7 @@ auto populateCacheWith(std::shared_ptr<CCDBFetcherHelper> const& helper,
368379
auto cacheId = helper->mapURL2DPLCache[path];
369380
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Reusing %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value);
370381
helper->mapURL2UUID[path].cacheHit++;
382+
helper->totalRequestedBytes += helper->mapURL2UUID[path].size;
371383
allocator.adoptFromCache(output, cacheId, header::gSerializationMethodCCDB);
372384
// the outputBuffer was not used, can we destroy it?
373385
}
@@ -382,13 +394,13 @@ AlgorithmSpec CCDBHelpers::fetchFromCCDB()
382394
/// Add a callback on stop which dumps the statistics for the caching per
383395
/// path
384396
callbacks.set<CallbackService::Id::Stop>([helper]() {
385-
LOGP(info, "CCDB cache miss/hit ratio:");
397+
LOGP(info, "CCDB cache miss/hit ratio ({} fetched / {} requested bytes):", helper->totalFetchedBytes, helper->totalRequestedBytes);
386398
for (auto& entry : helper->mapURL2UUID) {
387399
LOGP(info, " {}: {}/{} ({}-{} bytes)", entry.first, entry.second.cacheMiss, entry.second.cacheHit, entry.second.minSize, entry.second.maxSize);
388400
}
389401
});
390402

391-
return adaptStateless([helper](DataTakingContext& dtc, DataAllocator& allocator, TimingInfo& timingInfo) {
403+
return adaptStateless([helper](DataTakingContext& dtc, DataAllocator& allocator, TimingInfo& timingInfo, DataProcessingStats& stats) {
392404
auto sid = _o2_signpost_id_t{(int64_t)timingInfo.timeslice};
393405
O2_SIGNPOST_START(ccdb, sid, "fetchFromCCDB", "Fetching CCDB objects for timeslice %" PRIu64, (uint64_t)timingInfo.timeslice);
394406
static Long64_t orbitResetTime = -1;
@@ -429,8 +441,11 @@ AlgorithmSpec CCDBHelpers::fetchFromCCDB()
429441
if (etag.empty()) {
430442
helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid
431443
helper->mapURL2UUID[path].cacheMiss++;
444+
helper->mapURL2UUID[path].size = v.size();
432445
helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize);
433446
helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize);
447+
helper->totalFetchedBytes += v.size();
448+
helper->totalRequestedBytes += v.size();
434449
newOrbitResetTime = getOrbitResetTime(v);
435450
api.appendFlatHeader(v, headers);
436451
auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodNone);
@@ -440,8 +455,11 @@ AlgorithmSpec CCDBHelpers::fetchFromCCDB()
440455
// somewhere here pruneFromCache should be called
441456
helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid
442457
helper->mapURL2UUID[path].cacheMiss++;
458+
helper->mapURL2UUID[path].size = v.size();
443459
helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize);
444460
helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize);
461+
helper->totalFetchedBytes += v.size();
462+
helper->totalRequestedBytes += v.size();
445463
newOrbitResetTime = getOrbitResetTime(v);
446464
api.appendFlatHeader(v, headers);
447465
auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodNone);
@@ -455,6 +473,7 @@ AlgorithmSpec CCDBHelpers::fetchFromCCDB()
455473
auto cacheId = helper->mapURL2DPLCache[path];
456474
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "fetchFromCCDB", "Reusing %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value);
457475
helper->mapURL2UUID[path].cacheHit++;
476+
helper->totalRequestedBytes += helper->mapURL2UUID[path].size;
458477
allocator.adoptFromCache(output, cacheId, header::gSerializationMethodNone);
459478

460479
if (newOrbitResetTime != orbitResetTime) {
@@ -480,6 +499,8 @@ AlgorithmSpec CCDBHelpers::fetchFromCCDB()
480499
dtc.runNumber.data(), orbitResetTime, timingInfo.creation, timestamp, timingInfo.firstTForbit);
481500

482501
populateCacheWith(helper, timestamp, timingInfo, dtc, allocator);
502+
stats.updateStats({(int)ProcessingStatsId::CCDB_CACHE_FETCHED_BYTES, DataProcessingStats::Op::Set, (int64_t)helper->totalFetchedBytes});
503+
stats.updateStats({(int)ProcessingStatsId::CCDB_CACHE_REQUESTED_BYTES, DataProcessingStats::Op::Set, (int64_t)helper->totalRequestedBytes});
483504
O2_SIGNPOST_END(ccdb, _o2_signpost_id_t{(int64_t)timingInfo.timeslice}, "fetchFromCCDB", "Fetching CCDB objects");
484505
}); });
485506
}

Framework/Core/include/Framework/DataProcessingStats.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ enum struct ProcessingStatsId : short {
7272
CCDB_CACHE_HIT,
7373
CCDB_CACHE_MISS,
7474
CCDB_CACHE_FAILURE,
75+
CCDB_CACHE_FETCHED_BYTES,
76+
CCDB_CACHE_REQUESTED_BYTES,
7577
AVAILABLE_MANAGED_SHM_BASE = 512,
7678
};
7779

Framework/Core/src/CommonServices.cxx

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1176,6 +1176,22 @@ o2::framework::ServiceSpec CommonServices::dataProcessingStats()
11761176
.scope = Scope::DPL,
11771177
.minPublishInterval = 1000,
11781178
.maxRefreshLatency = 10000,
1179+
.sendInitialValue = true},
1180+
MetricSpec{.name = "ccdb-cache-fetched-bytes",
1181+
.enabled = true,
1182+
.metricId = static_cast<short>(ProcessingStatsId::CCDB_CACHE_FETCHED_BYTES),
1183+
.kind = Kind::UInt64,
1184+
.scope = Scope::DPL,
1185+
.minPublishInterval = 1000,
1186+
.maxRefreshLatency = 10000,
1187+
.sendInitialValue = true},
1188+
MetricSpec{.name = "ccdb-cache-requested-bytes",
1189+
.enabled = true,
1190+
.metricId = static_cast<short>(ProcessingStatsId::CCDB_CACHE_REQUESTED_BYTES),
1191+
.kind = Kind::UInt64,
1192+
.scope = Scope::DPL,
1193+
.minPublishInterval = 1000,
1194+
.maxRefreshLatency = 10000,
11791195
.sendInitialValue = true}};
11801196

11811197
for (auto& metric : metrics) {

0 commit comments

Comments
 (0)