Skip to content

Commit 95014d5

Browse files
authored
DPL: make sure firstTFOrbit and tfCounter are set correctly (#5701)
In order to do so, we need to capture tfCounter in register 14 and firstTForbit in register 15 when we capture the timeslice in register 0.
1 parent 0748bbb commit 95014d5

File tree

10 files changed

+73
-23
lines changed

10 files changed

+73
-23
lines changed

Framework/Core/include/Framework/DataDescriptorMatcher.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ class StartTimeValueMatcher : public ValueHolder<uint64_t>
189189
/// This will match the timing information which is currently in
190190
/// the DataProcessingHeader. Notice how we apply the scale to the
191191
/// actual values found.
192-
bool match(DataProcessingHeader const& dph, VariableContext& context) const;
192+
bool match(header::DataHeader const& dh, DataProcessingHeader const& dph, VariableContext& context) const;
193193

194194
private:
195195
uint64_t mScale;

Framework/Core/include/Framework/DataProcessingHeader.h

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77
// In applying this license CERN does not waive the privileges and immunities
88
// granted to it by virtue of its status as an Intergovernmental Organization
99
// or submit itself to any jurisdiction.
10-
#ifndef FRAMEWORK_DATAPROCESSINGHEADER_H
11-
#define FRAMEWORK_DATAPROCESSINGHEADER_H
10+
#ifndef O2_FRAMEWORK_DATAPROCESSINGHEADER_H_
11+
#define O2_FRAMEWORK_DATAPROCESSINGHEADER_H_
1212

1313
#include "Headers/DataHeader.h"
1414

@@ -17,9 +17,7 @@
1717
#include <cassert>
1818
#include <chrono>
1919

20-
namespace o2
21-
{
22-
namespace framework
20+
namespace o2::framework
2321
{
2422

2523
//__________________________________________________________________________________________________
@@ -98,7 +96,6 @@ struct DataProcessingHeader : public header::BaseHeader {
9896
}
9997
};
10098

101-
} // namespace framework
102-
} // namespace o2
99+
} // namespace o2::framework
103100

104-
#endif // FRAMEWORK_DATAPROCESSINGHEADER_H
101+
#endif // O2_FRAMEWORK_DATAPROCESSINGHEADER_H_

Framework/Core/include/Framework/DataRelayer.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,11 @@ class DataRelayer
106106
/// Notice how this avoids exposing the timesliceIndex directly
107107
/// so that we can mutex on it.
108108
TimesliceId getTimesliceForSlot(TimesliceSlot slot);
109+
110+
/// Get the firstTFOrbit associate to a given slot.
111+
uint32_t getFirstTFOrbitForSlot(TimesliceSlot slot);
112+
/// Get the firstTFCounter associate to a given slot.
113+
uint32_t getFirstTFCounterForSlot(TimesliceSlot slot);
109114
/// Remove all pending messages
110115
void clear();
111116

Framework/Core/include/Framework/TimesliceIndex.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,12 @@ class TimesliceIndex
7474
/// could correspond to different slots once we implement wildcards
7575
/// (e.g. if we ask for InputSpec{"*", "CLUSTERS"}).
7676
inline TimesliceId getTimesliceForSlot(TimesliceSlot slot) const;
77+
/// Given @a slot, @return the firstTFOrbit (i.e. the variable at positiorn 15)
78+
/// associated to it.
79+
inline uint32_t getFirstTFOrbitForSlot(TimesliceSlot slot) const;
80+
/// Given @a slot, @return the TFcounter (i.e. the variable at positiorn 14)
81+
/// associated to it.
82+
inline uint32_t getFirstTFCounterForSlot(TimesliceSlot slot) const;
7783
/// Given a slot, @return the VariableContext associated to it.
7884
/// This effectively means that the TimesliceIndex is now owner of the
7985
/// VariableContext.

Framework/Core/include/Framework/TimesliceIndex.inc

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,13 +110,36 @@ inline TimesliceSlot TimesliceIndex::findOldestSlot() const
110110
inline TimesliceId TimesliceIndex::getTimesliceForSlot(TimesliceSlot slot) const
111111
{
112112
assert(mVariables.size() > slot.index);
113+
// timeslice is always at register 0
113114
auto pval = std::get_if<uint64_t>(&mVariables[slot.index].get(0));
114115
if (pval == nullptr) {
115116
return TimesliceId{TimesliceId::INVALID};
116117
}
117118
return TimesliceId{*pval};
118119
}
119120

121+
inline uint32_t TimesliceIndex::getFirstTFOrbitForSlot(TimesliceSlot slot) const
122+
{
123+
assert(mVariables.size() > slot.index);
124+
// firstTForbit is always at register 15
125+
auto pval = std::get_if<uint32_t>(&mVariables[slot.index].get(15));
126+
if (pval == nullptr) {
127+
return -1;
128+
}
129+
return *pval;
130+
}
131+
132+
inline uint32_t TimesliceIndex::getFirstTFCounterForSlot(TimesliceSlot slot) const
133+
{
134+
assert(mVariables.size() > slot.index);
135+
// firstTForbit is always at register 15
136+
auto pval = std::get_if<uint32_t>(&mVariables[slot.index].get(14));
137+
if (pval == nullptr) {
138+
return -1;
139+
}
140+
return *pval;
141+
}
142+
120143
inline data_matcher::VariableContext& TimesliceIndex::getVariablesForSlot(TimesliceSlot slot)
121144
{
122145
assert(mVariables.size() > slot.index);

Framework/Core/include/Framework/TimingInfo.h

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,18 @@
88
// granted to it by virtue of its status as an Intergovernmental Organization
99
// or submit itself to any jurisdiction.
1010

11-
#ifndef FRAMEWORK_TIMINGINFO_H
12-
#define FRAMEWORK_TIMINGINFO_H
11+
#ifndef O2_FRAMEWORK_TIMINGINFO_H_
12+
#define O2_FRAMEWORK_TIMINGINFO_H_
1313

1414
#include <cstddef>
15+
#include <cstdint>
1516

1617
/// This class holds the information about timing
1718
/// of the messages being processed.
1819
struct TimingInfo {
1920
size_t timeslice; /// the timeslice associated to current processing
21+
uint32_t firstTFOrbit = -1; /// the orbit the TF begins
22+
uint32_t tfCounter = -1; // the counter associated to a TF
2023
};
2124

22-
#endif // Timing information for the current computation
25+
#endif // O2_FRAMEWORK_TIMINGINFO_H_

Framework/Core/src/DataAllocator.cxx

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,8 @@ FairMQMessagePtr DataAllocator::headerMessageFromOutput(Output const& spec,
9898
dh.subSpecification = spec.subSpec;
9999
dh.payloadSize = payloadSize;
100100
dh.payloadSerializationMethod = method;
101+
dh.tfCounter = mTimingInfo->tfCounter;
102+
dh.firstTForbit = mTimingInfo->firstTFOrbit;
101103

102104
DataProcessingHeader dph{mTimingInfo->timeslice, 1};
103105
auto& context = mRegistry->get<MessageContext>();

Framework/Core/src/DataDescriptorMatcher.cxx

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,7 @@
1616
#include "Framework/RuntimeError.h"
1717
#include <iostream>
1818

19-
namespace o2
20-
{
21-
namespace framework
22-
{
23-
namespace data_matcher
19+
namespace o2::framework::data_matcher
2420
{
2521

2622
ContextElement::Value const& VariableContext::get(size_t pos) const
@@ -101,14 +97,18 @@ bool SubSpecificationTypeValueMatcher::match(header::DataHeader const& header, V
10197
/// This will match the timing information which is currently in
10298
/// the DataProcessingHeader. Notice how we apply the scale to the
10399
/// actual values found.
104-
bool StartTimeValueMatcher::match(DataProcessingHeader const& dph, VariableContext& context) const
100+
bool StartTimeValueMatcher::match(header::DataHeader const& dh, DataProcessingHeader const& dph, VariableContext& context) const
105101
{
106102
if (auto ref = std::get_if<ContextRef>(&mValue)) {
107103
auto& variable = context.get(ref->index);
108104
if (auto value = std::get_if<uint64_t>(&variable)) {
109105
return (dph.startTime / mScale) == *value;
110106
}
111107
context.put({ref->index, dph.startTime / mScale});
108+
// We always put in 14 the tfCounter
109+
context.put({14, dh.tfCounter});
110+
// We always put in 15 the firstTForbit
111+
context.put({15, dh.firstTForbit});
112112
return true;
113113
} else if (auto v = std::get_if<uint64_t>(&mValue)) {
114114
return (dph.startTime / mScale) == *v;
@@ -262,11 +262,12 @@ bool DataDescriptorMatcher::match(char const* d, VariableContext& context) const
262262
} else if (auto pval4 = std::get_if<ConstantValueMatcher>(&mLeft)) {
263263
leftValue = pval4->match();
264264
} else if (auto pval5 = std::get_if<StartTimeValueMatcher>(&mLeft)) {
265+
auto dh = o2::header::get<header::DataHeader*>(d);
265266
auto dph = o2::header::get<DataProcessingHeader*>(d);
266267
if (dph == nullptr) {
267268
throw runtime_error("Cannot find DataProcessingHeader");
268269
}
269-
leftValue = pval5->match(*dph, context);
270+
leftValue = pval5->match(*dh, *dph, context);
270271
} else {
271272
throw runtime_error("Bad parsing tree");
272273
}
@@ -295,8 +296,9 @@ bool DataDescriptorMatcher::match(char const* d, VariableContext& context) const
295296
} else if (auto pval4 = std::get_if<ConstantValueMatcher>(&mRight)) {
296297
rightValue = pval4->match();
297298
} else if (auto pval5 = std::get_if<StartTimeValueMatcher>(&mRight)) {
299+
auto dh = o2::header::get<header::DataHeader*>(d);
298300
auto dph = o2::header::get<DataProcessingHeader*>(d);
299-
rightValue = pval5->match(*dph, context);
301+
rightValue = pval5->match(*dh, *dph, context);
300302
}
301303
// There are cases in which not having a rightValue might be legitimate,
302304
// so we do not throw an exception.
@@ -478,6 +480,4 @@ std::ostream& operator<<(std::ostream& os, DataDescriptorMatcher::Op const& op)
478480
return os;
479481
}
480482

481-
} // namespace data_matcher
482-
} // namespace framework
483-
} // namespace o2
483+
} // namespace o2::framework::data_matcher

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -845,6 +845,8 @@ bool DataProcessingDevice::tryDispatchComputation(DataProcessorContext& context,
845845
ZoneScopedN("DataProcessingDevice::prepareForCurrentTimeslice");
846846
auto timeslice = relayer->getTimesliceForSlot(i);
847847
timingInfo->timeslice = timeslice.value;
848+
timingInfo->tfCounter = relayer->getFirstTFCounterForSlot(i);
849+
timingInfo->firstTFOrbit = relayer->getFirstTFOrbitForSlot(i);
848850
};
849851

850852
// When processing them, timers will have to be cleaned up

Framework/Core/src/DataRelayer.cxx

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -602,6 +602,18 @@ DataRelayerStats const& DataRelayer::getStats() const
602602
return mStats;
603603
}
604604

605+
uint32_t DataRelayer::getFirstTFOrbitForSlot(TimesliceSlot slot)
606+
{
607+
std::scoped_lock<LockableBase(std::recursive_mutex)> lock(mMutex);
608+
return mTimesliceIndex.getFirstTFOrbitForSlot(slot);
609+
}
610+
611+
uint32_t DataRelayer::getFirstTFCounterForSlot(TimesliceSlot slot)
612+
{
613+
std::scoped_lock<LockableBase(std::recursive_mutex)> lock(mMutex);
614+
return mTimesliceIndex.getFirstTFCounterForSlot(slot);
615+
}
616+
605617
void DataRelayer::sendContextState()
606618
{
607619
std::scoped_lock<LockableBase(std::recursive_mutex)> lock(mMutex);

0 commit comments

Comments
 (0)