Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 25 additions & 15 deletions Framework/Core/include/Framework/InputRecord.h
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,8 @@ class InputRecord
// it's updated.
// FIXME: add ability to apply callbacks to deserialised objects.
auto id = ObjectCache::Id::fromRef(ref);
auto metadata = DataRefUtils::extractCCDBHeaders(ref);
id.etag = metadata.count("ETag") ? metadata.at("ETag") : "";
ConcreteDataMatcher matcher{header->dataOrigin, header->dataDescription, header->subSpecification};
// If the matcher does not have an entry in the cache, deserialise it
// and cache the deserialised object at the given id.
Expand All @@ -427,25 +429,29 @@ class InputRecord
void* obj = (void*)result.get();
callbacks.call<CallbackService::Id::CCDBDeserialised>((ConcreteDataMatcher&)matcher, (void*)obj);
cache.idToObject[id] = obj;
LOGP(info, "Caching in {} ptr to {} ({})", id.value, path, obj);
LOGP(info, "Caching in {} ptr ({}) to {} ({})", id.value, id.etag.data(), path, obj);
return result;
}
auto& oldId = cacheEntry->second;
// The id in the cache is the same, let's simply return it.
if (oldId.value == id.value) {
std::unique_ptr<ValueT const, Deleter<ValueT const>> result((ValueT const*)cache.idToObject[id], false);
LOGP(debug, "Returning cached entry {} for {} ({})", id.value, path, (void*)result.get());
// If the etag is present in both, we compare etags, otherwise we compare values
if (oldId == id) {
std::unique_ptr<ValueT const, Deleter<ValueT const>> result((ValueT const*)cache.idToObject[oldId], false);
LOGP(debug, "Returning cached entry {} ({}) for {} ({})", oldId.value, oldId.etag.data(), path, (void*)result.get());
return result;
}
// The id in the cache is different. Let's destroy the old cached entry
// and create a new one.
delete reinterpret_cast<ValueT*>(cache.idToObject[oldId]);
if constexpr (!std::is_base_of<o2::conf::ConfigurableParam, ValueT>::value) {
delete reinterpret_cast<ValueT*>(cache.idToObject[oldId]);
}
cache.idToObject.erase(oldId);
std::unique_ptr<ValueT const, Deleter<ValueT const>> result(DataRefUtils::as<CCDBSerialized<ValueT>>(ref).release(), false);
void* obj = (void*)result.get();
callbacks.call<CallbackService::Id::CCDBDeserialised>((ConcreteDataMatcher&)matcher, (void*)obj);
cache.idToObject[id] = obj;
LOGP(info, "Replacing cached entry {} with {} for {} ({})", oldId.value, id.value, path, obj);
oldId.value = id.value;
LOGP(info, "Replacing cached entry {} ({}) with {} ({}) for {} ({})", oldId.value, oldId.etag.data(), id.value, id.etag.data(), path, obj);
oldId = id;
return result;
} else {
throw runtime_error("Attempt to extract object from message with unsupported serialization type");
Expand Down Expand Up @@ -495,6 +501,8 @@ class InputRecord
// keep around an instance of the associated object and deserialise it only when
// it's updated.
auto id = ObjectCache::Id::fromRef(ref);
auto metadata = DataRefUtils::extractCCDBHeaders(ref);
id.etag = metadata.count("ETag") ? metadata.at("ETag") : "";
ConcreteDataMatcher matcher{header->dataOrigin, header->dataDescription, header->subSpecification};
// If the matcher does not have an entry in the cache, deserialise it
// and cache the deserialised object at the given id.
Expand All @@ -504,21 +512,23 @@ class InputRecord
auto cacheEntry = cache.matcherToMetadataId.find(path);
if (cacheEntry == cache.matcherToMetadataId.end()) {
cache.matcherToMetadataId.insert(std::make_pair(path, id));
cache.idToMetadata[id] = DataRefUtils::extractCCDBHeaders(ref);
LOGP(info, "Caching CCDB metadata {}: {}", id.value, path);
cache.idToMetadata[id] = metadata;
LOGP(info, "Caching CCDB metadata {} ({}): {}", id.value, id.etag.data(), path);
return cache.idToMetadata[id];
}
auto& oldId = cacheEntry->second;
// The id in the cache is the same, let's simply return it.
if (oldId.value == id.value) {
LOGP(debug, "Returning cached CCDB metatada {}: {}", id.value, path);
return cache.idToMetadata[id];
// If the etag is present in both, we compare etags, otherwise we compare values
if (oldId == id) {
LOGP(debug, "Returning cached CCDB metatada {} ({}): {}", oldId.value, oldId.etag.data(), path);
return cache.idToMetadata[oldId];
}
// The id in the cache is different. Let's destroy the old cached entry
// and create a new one.
LOGP(info, "Replacing cached entry {} with {} for {}", oldId.value, id.value, path);
cache.idToMetadata[id] = DataRefUtils::extractCCDBHeaders(ref);
oldId.value = id.value;
LOGP(info, "Replacing cached entry {} ({}) with {} ({}) for {}", oldId.value, oldId.etag.data(), id.value, id.etag.data(), path);
cache.idToMetadata.erase(oldId);
cache.idToMetadata[id] = metadata;
oldId = id;
return cache.idToMetadata[id];
}

Expand Down
6 changes: 5 additions & 1 deletion Framework/Core/include/Framework/ObjectCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,16 @@ namespace o2::framework
struct ObjectCache {
struct Id {
int64_t value;
std::string etag;
static Id fromRef(DataRef& ref)
{
return {reinterpret_cast<int64_t>(ref.payload)};
return {reinterpret_cast<int64_t>(ref.payload), ""};
}
bool operator==(const Id& other) const
{
if (!etag.empty() && !other.etag.empty()) {
return etag == other.etag;
}
return value == other.value;
}

Expand Down