Skip to content

Commit d015580

Browse files
committed
Fix handling of n-TFs-buffer in tpc-distribute-cmv workflow
1 parent 14ff7db commit d015580

2 files changed

Lines changed: 137 additions & 27 deletions

File tree

Detectors/TPC/workflow/include/TPCWorkflow/TPCDistributeCMVSpec.h

Lines changed: 136 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,40 @@ class TPCDistributeCMVSpec : public o2::framework::Task
152152
}
153153

154154
const auto tf = processing_helpers::getCurrentTF(pc);
155+
156+
// EOS sentinel from FLP partial flush (n-TFs-buffer > actual TFs delivered): accumulate raw
157+
// data into mCRURawBuffer here so endOfStream() can unpack it. This guard MUST come before
158+
// the firstTF auto-detection to prevent UINT32_MAX being treated as a real TF number.
159+
if (tf == std::numeric_limits<uint32_t>::max()) {
160+
if (mTimestampStart == 0) {
161+
mTimestampStart = pc.services().get<o2::framework::TimingInfo>().creation;
162+
}
163+
for (auto& ref : InputRecordWalker(pc.inputs(), mFilter)) {
164+
auto const* hdr = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
165+
const unsigned int cru = hdr->subSpecification >> 7;
166+
if (!std::binary_search(mCRUs.begin(), mCRUs.end(), cru)) {
167+
continue;
168+
}
169+
auto cmvVec = pc.inputs().get<pmr::vector<uint16_t>>(ref);
170+
auto& buf = mCRURawBuffer[cru];
171+
buf.insert(buf.end(), cmvVec.begin(), cmvVec.end());
172+
}
173+
// Capture orbit/BC from the FLP's EOS flush (sent alongside CMVGROUP); store once.
174+
if (mEOSFirstOrbit == 0) {
175+
for (auto& ref : InputRecordWalker(pc.inputs(), mOrbitFilter)) {
176+
auto const* hdr = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
177+
const unsigned int cru = hdr->subSpecification >> 7;
178+
if (std::binary_search(mCRUs.begin(), mCRUs.end(), cru)) {
179+
const auto orbitBC = pc.inputs().get<uint64_t>(ref);
180+
mEOSFirstOrbit = static_cast<uint32_t>(orbitBC >> 32);
181+
mEOSFirstBC = static_cast<uint16_t>(orbitBC & 0xFFFFu);
182+
break;
183+
}
184+
}
185+
}
186+
return;
187+
}
188+
155189
mLastSeenTF = tf; // track for endOfStream flush
156190

157191
// automatically detect firstTF in case firstTF was not specified
@@ -177,7 +211,6 @@ class TPCDistributeCMVSpec : public o2::framework::Task
177211

178212
if (relTF >= mProcessedCRU[currentBuffer].size()) {
179213
LOGP(warning, "Skipping tf {}: relative tf {} is larger than size of buffer: {}", tf, relTF, mProcessedCRU[currentBuffer].size());
180-
181214
// check number of processed CRUs for previous TFs. If CRUs are missing for them, they are probably lost/not received
182215
mProcessedTotalData = mCheckEveryNData;
183216
checkIntervalsForMissingData(pc, currentBuffer, relTF, tf);
@@ -190,7 +223,7 @@ class TPCDistributeCMVSpec : public o2::framework::Task
190223

191224
// record the absolute first TF of this aggregation interval
192225
if (mIntervalTFCount == 0) {
193-
mIntervalFirstTF = tf;
226+
mIntervalFirstTF = tf - mNTFsBuffer + 1;
194227
}
195228

196229
// set CCDB start timestamp once at the start of each aggregation interval
@@ -236,12 +269,9 @@ class TPCDistributeCMVSpec : public o2::framework::Task
236269
mProcessedCRUs[currentBuffer][relTF][cru] = true;
237270
}
238271

239-
// accumulate raw 16-bit CMVs into the flat array for the current TF
272+
// buffer the full concatenated CMV data for all N sub-TFs; unpacked at fill time below
240273
auto cmvVec = pc.inputs().get<pmr::vector<uint16_t>>(ref);
241-
const uint32_t nTimeBins = std::min(static_cast<uint32_t>(cmvVec.size()), cmv::NTimeBinsPerTF);
242-
for (uint32_t tb = 0; tb < nTimeBins; ++tb) {
243-
mCurrentTF.mDataPerTF[cru * cmv::NTimeBinsPerTF + tb] = cmvVec[tb];
244-
}
274+
mCRURawBuffer[cru].assign(cmvVec.begin(), cmvVec.end());
245275
}
246276

247277
LOGP(info, "Number of received CRUs for current TF: {} Needed a total number of processed CRUs of: {} Current TF: {}", mProcessedCRU[currentBuffer][relTF], mCRUs.size(), tf);
@@ -254,23 +284,50 @@ class TPCDistributeCMVSpec : public o2::framework::Task
254284
if (mProcessedCRU[currentBuffer][relTF] == mCRUs.size()) {
255285
++mProcessedTFs[currentBuffer];
256286

257-
// Pre-processing: quantisation / rounding / zeroing (applied before compression)
258-
mCurrentTF.roundToIntegers(mRoundIntegersThreshold);
259-
if (mZeroThreshold > 0.f) {
260-
mCurrentTF.zeroSmallValues(mZeroThreshold);
261-
}
262-
if (mDynamicPrecisionSigma > 0.f) {
263-
mCurrentTF.trimGaussianPrecision(mDynamicPrecisionMean, mDynamicPrecisionSigma);
264-
}
265-
266-
// Compress; the raw CMVPerTF branch is used when all flags are zero
287+
// save orbit/BC captured from mOrbitFilter above before the unpack loop resets mCurrentTF
288+
const uint32_t batchFirstOrbit = mCurrentTF.firstOrbit;
289+
const uint16_t batchFirstBC = mCurrentTF.firstBC;
290+
291+
// Derive the per-sub-TF orbit stride from the actual data:
292+
// TimingInfo.firstTForbit is the orbit of the last real TF in the batch (the TF that triggered the FLP to send).
293+
// The FLP provides the orbit of the first real TF. Interpolating between the two gives the true stride,
294+
// independent of the GRPECS/config nHBFPerTF value.
295+
const uint32_t batchLastOrbit = static_cast<uint32_t>(pc.services().get<o2::framework::TimingInfo>().firstTForbit);
296+
const uint32_t orbitStep = (mNTFsBuffer > 1 && batchLastOrbit > batchFirstOrbit)
297+
? (batchLastOrbit - batchFirstOrbit) / static_cast<uint32_t>(mNTFsBuffer - 1)
298+
: static_cast<uint32_t>(o2::base::GRPGeomHelper::instance().getNHBFPerTF());
299+
mLastOrbitStep = orbitStep; // cache for EOS partial-batch fallback
300+
301+
// Unpack N sub-TFs from the concatenated raw buffer and fill one tree entry per real TF
267302
const uint8_t flags = buildCompressionFlags();
268-
if (flags != CMVEncoding::kNone) {
269-
mCurrentCompressedTF = mCurrentTF.compress(flags);
303+
for (int iTF = 0; iTF < mNTFsBuffer; ++iTF) {
304+
mCurrentTF = CMVPerTF{};
305+
mCurrentTF.firstOrbit = batchFirstOrbit + static_cast<uint32_t>(iTF) * orbitStep;
306+
mCurrentTF.firstBC = (iTF == 0) ? batchFirstBC : 0;
307+
for (const auto& [cru, buf] : mCRURawBuffer) {
308+
const uint32_t offset = static_cast<uint32_t>(iTF) * cmv::NTimeBinsPerTF;
309+
if (offset >= static_cast<uint32_t>(buf.size())) {
310+
break;
311+
}
312+
const uint32_t nBins = std::min(static_cast<uint32_t>(buf.size()) - offset, cmv::NTimeBinsPerTF);
313+
for (uint32_t tb = 0; tb < nBins; ++tb) {
314+
mCurrentTF.mDataPerTF[cru * cmv::NTimeBinsPerTF + tb] = buf[offset + tb];
315+
}
316+
}
317+
mCurrentTF.roundToIntegers(mRoundIntegersThreshold);
318+
if (mZeroThreshold > 0.f) {
319+
mCurrentTF.zeroSmallValues(mZeroThreshold);
320+
}
321+
if (mDynamicPrecisionSigma > 0.f) {
322+
mCurrentTF.trimGaussianPrecision(mDynamicPrecisionMean, mDynamicPrecisionSigma);
323+
}
324+
if (flags != CMVEncoding::kNone) {
325+
mCurrentCompressedTF = mCurrentTF.compress(flags);
326+
}
327+
mIntervalTree->Fill();
328+
++mIntervalTFCount;
270329
}
271-
272-
mIntervalTree->Fill();
273-
++mIntervalTFCount;
330+
mCRURawBuffer.clear();
274331
mCurrentTF = CMVPerTF{};
275332
}
276333

@@ -282,6 +339,55 @@ class TPCDistributeCMVSpec : public o2::framework::Task
282339

283340
void endOfStream(o2::framework::EndOfStreamContext& ec) final
284341
{
342+
// Unpack any partial TFs accumulated in mCRURawBuffer from the FLP's final flush at EOS
343+
if (!mCRURawBuffer.empty()) {
344+
size_t maxBufSize = 0;
345+
for (const auto& [cru, buf] : mCRURawBuffer) {
346+
maxBufSize = std::max(maxBufSize, buf.size());
347+
}
348+
const int nActualTFs = static_cast<int>(maxBufSize / cmv::NTimeBinsPerTF);
349+
LOGP(info, "Flushing {} partial TFs accumulated at end of stream", nActualTFs);
350+
if (nActualTFs > 0 && mIntervalTFCount == 0) {
351+
mIntervalFirstTF = mLastSeenTF + 1;
352+
}
353+
const uint8_t flags = buildCompressionFlags();
354+
// Use the actual stride seen in run(); fall back to GRP only if no complete batch was seen.
355+
const uint32_t eosOrbitStep = (mLastOrbitStep > 0)
356+
? mLastOrbitStep
357+
: static_cast<uint32_t>(o2::base::GRPGeomHelper::instance().getNHBFPerTF());
358+
for (int iTF = 0; iTF < nActualTFs; ++iTF) {
359+
mCurrentTF = CMVPerTF{};
360+
mCurrentTF.firstOrbit = mEOSFirstOrbit + static_cast<uint32_t>(iTF) * eosOrbitStep;
361+
mCurrentTF.firstBC = (iTF == 0) ? mEOSFirstBC : 0;
362+
for (const auto& [cru, buf] : mCRURawBuffer) {
363+
const uint32_t offset = static_cast<uint32_t>(iTF) * cmv::NTimeBinsPerTF;
364+
if (offset >= static_cast<uint32_t>(buf.size())) {
365+
break;
366+
}
367+
const uint32_t nBins = std::min(static_cast<uint32_t>(buf.size()) - offset, cmv::NTimeBinsPerTF);
368+
for (uint32_t tb = 0; tb < nBins; ++tb) {
369+
mCurrentTF.mDataPerTF[cru * cmv::NTimeBinsPerTF + tb] = buf[offset + tb];
370+
}
371+
}
372+
mCurrentTF.roundToIntegers(mRoundIntegersThreshold);
373+
if (mZeroThreshold > 0.f) {
374+
mCurrentTF.zeroSmallValues(mZeroThreshold);
375+
}
376+
if (mDynamicPrecisionSigma > 0.f) {
377+
mCurrentTF.trimGaussianPrecision(mDynamicPrecisionMean, mDynamicPrecisionSigma);
378+
}
379+
if (flags != CMVEncoding::kNone) {
380+
mCurrentCompressedTF = mCurrentTF.compress(flags);
381+
}
382+
mIntervalTree->Fill();
383+
++mIntervalTFCount;
384+
}
385+
mCRURawBuffer.clear();
386+
mCurrentTF = CMVPerTF{};
387+
// advance mLastSeenTF by the number of recovered EOS TFs so lastTF metadata is correct
388+
mLastSeenTF += static_cast<uint32_t>(nActualTFs);
389+
}
390+
285391
LOGP(info, "End of stream, flushing CMV interval ({} TFs)", mIntervalTFCount);
286392
// correct mTFEnd for the partial last interval so the CCDB validity end timestamp reflects the actual last TF, not the expected interval end
287393
mTFEnd[mBuffer] = mLastSeenTF;
@@ -351,6 +457,10 @@ class TPCDistributeCMVSpec : public o2::framework::Task
351457
std::vector<InputSpec> mOrbitFilter{}; ///< filter for CMVORBITINFO from FLP
352458
std::array<std::vector<bool>, 2> mOrbitInfoForwarded{}; ///< tracks whether orbit/BC has been captured per (buffer, relTF)
353459
uint32_t mLastSeenTF{0}; ///< last TF counter seen in run(), used to set lastTF in endOfStream flush
460+
uint32_t mEOSFirstOrbit{0}; ///< firstOrbit from the FLP's EOS partial-buffer flush (captured in run() EOS sentinel path)
461+
uint16_t mEOSFirstBC{0}; ///< firstBC from the FLP's EOS partial-buffer flush
462+
uint32_t mLastOrbitStep{0}; ///< per-sub-TF orbit stride from the last complete batch; used as fallback in endOfStream()
463+
std::unordered_map<uint32_t, std::vector<uint16_t>> mCRURawBuffer{}; ///< full concatenated CMV data per CRU for the current relTF slot; cleared after N-TF unpack
354464

355465
/// Returns real number of TFs taking buffer size into account
356466
unsigned int getNRealTFs() const { return mNTFsBuffer * mTimeFrames; }
@@ -418,20 +528,20 @@ class TPCDistributeCMVSpec : public o2::framework::Task
418528
// if last buffer has smaller time range check the whole last buffer
419529
if ((mTFStart[currentBuffer] > mTFStart[!currentBuffer]) && (relTF > mNTFsDataDrop)) {
420530
LOGP(warning, "Checking last buffer from {} to {}", mStartNTFsDataDrop[!currentBuffer], mProcessedCRU[!currentBuffer].size());
421-
checkMissingData(pc, !currentBuffer, mStartNTFsDataDrop[!currentBuffer], mProcessedCRU[!currentBuffer].size());
531+
checkMissingData(!currentBuffer, mStartNTFsDataDrop[!currentBuffer], mProcessedCRU[!currentBuffer].size());
422532
LOGP(info, "All empty TFs for TF {} for current buffer filled with dummy and sent. Clearing buffer", tf);
423533
sendOutput(pc.outputs(), tf);
424534
finishInterval(pc, !currentBuffer, tf);
425535
}
426536

427537
const int tfEndCheck = std::clamp(static_cast<int>(relTF) - mNTFsDataDrop, 0, static_cast<int>(mProcessedCRU[currentBuffer].size()));
428538
LOGP(info, "Checking current buffer from {} to {}", mStartNTFsDataDrop[currentBuffer], tfEndCheck);
429-
checkMissingData(pc, currentBuffer, mStartNTFsDataDrop[currentBuffer], tfEndCheck);
539+
checkMissingData(currentBuffer, mStartNTFsDataDrop[currentBuffer], tfEndCheck);
430540
mStartNTFsDataDrop[currentBuffer] = tfEndCheck;
431541
}
432542
}
433543

434-
void checkMissingData(o2::framework::ProcessingContext& pc, const bool currentBuffer, const int startTF, const int endTF)
544+
void checkMissingData(const bool currentBuffer, const int startTF, const int endTF)
435545
{
436546
for (int iTF = startTF; iTF < endTF; ++iTF) {
437547
if (mProcessedCRU[currentBuffer][iTF] != mCRUs.size()) {
@@ -528,7 +638,7 @@ class TPCDistributeCMVSpec : public o2::framework::Task
528638

529639
const int nHBFPerTF = o2::base::GRPGeomHelper::instance().getNHBFPerTF();
530640
// use the actual number of TFs in this interval (mIntervalTFCount) rather than mTimeFrames, so the CCDB validity end is correct for partial last intervals
531-
const long timeStampEnd = mTimestampStart + static_cast<long>(mIntervalTFCount * mNTFsBuffer * nHBFPerTF * o2::constants::lhc::LHCOrbitMUS * 1e-3);
641+
const long timeStampEnd = mTimestampStart + static_cast<long>(mIntervalTFCount * nHBFPerTF * o2::constants::lhc::LHCOrbitMUS * 1e-3);
532642

533643
if (timeStampEnd <= mTimestampStart) {
534644
LOGP(warning, "Invalid CCDB timestamp range start:{} end:{}, skipping upload!",

Detectors/TPC/workflow/src/tpc-distribute-cmv.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ WorkflowSpec defineDataProcessing(ConfigContext const& config)
6767
}
6868
assert(timeframes >= nTFsBuffer);
6969
timeframes /= nTFsBuffer;
70-
LOGP(info, "Using {} timeframes as each TF contains {} CMVs", timeframes, nTFsBuffer);
70+
LOGP(info, "Aggregating {} TFs per CCDB object ({} aggregation intervals of {} TFs each)", timeframes * nTFsBuffer, timeframes, nTFsBuffer);
7171
const auto crusPerLane = nCRUs / nLanes + ((nCRUs % nLanes) != 0);
7272
WorkflowSpec workflow;
7373
for (int ilane = 0; ilane < nLanes; ++ilane) {

0 commit comments

Comments
 (0)