diff --git a/Detectors/TPC/workflow/CMakeLists.txt b/Detectors/TPC/workflow/CMakeLists.txt index 0f8d73b1cbe7e..37ac398db40ec 100644 --- a/Detectors/TPC/workflow/CMakeLists.txt +++ b/Detectors/TPC/workflow/CMakeLists.txt @@ -304,4 +304,9 @@ o2_add_executable(cmv-distribute SOURCES src/tpc-distribute-cmv.cxx PUBLIC_LINK_LIBRARIES O2::TPCWorkflow) -add_subdirectory(readers) \ No newline at end of file +o2_add_executable(cmv-aggregate + COMPONENT_NAME tpc + SOURCES src/tpc-aggregate-cmv.cxx + PUBLIC_LINK_LIBRARIES O2::TPCWorkflow) + +add_subdirectory(readers) diff --git a/Detectors/TPC/workflow/README.md b/Detectors/TPC/workflow/README.md index b7a19da121e9b..5d2ccd3ac9166 100644 --- a/Detectors/TPC/workflow/README.md +++ b/Detectors/TPC/workflow/README.md @@ -285,7 +285,8 @@ The CMV workflows parse raw TPC data, buffer Common Mode Values per CRU on FLPs, |---|---|---| | `o2-tpc-cmv-to-vector` | `TPC/CMVVECTOR` | Parses raw TPC data and creates vectors of CMVs per CRU | | `o2-tpc-cmv-flp` | `TPC/CMVGROUP` | Buffers N TFs per CRU on the FLP and groups them for forwarding | -| `o2-tpc-cmv-distribute` | TTree / CCDB payload | Merges CRUs over N TFs on the calibration node, serializes the CMVContainer into a TTree, and either writes it to disk (`--dump-cmvs`) or forwards it as a CCDB object (`--enable-CCDB-output`) | +| `o2-tpc-cmv-distribute` | `TPC/CMVAGG*` | Routes grouped CMV batches from the calibration node to the aggregate workflow while preserving buffered TF and lane handling | +| `o2-tpc-cmv-aggregate` | TTree / CCDB payload | Collects all CRUs for each aggregate lane, preprocesses and compresses CMVs per buffered TF slice, then writes the CMVContainer TTree to disk (`--output-dir`) and/or forwards it as a CCDB object (`--enable-CCDB-output`) | #### `o2-tpc-cmv-to-vector` @@ -319,10 +320,27 @@ The CMV workflows parse raw TPC data, buffer Common Mode Values per CRU on FLPs, | `--timeframes` | 2000 | Number of TFs aggregated per calibration interval | | `--firstTF` | -1 | First time frame index; -1 = auto-detect from first incoming TF; values < -1 set an offset of `\|firstTF\|+1` TFs before the first interval begins | | `--lanes` | 1 | Number of parallel lanes (CRUs are split evenly across lanes) | +| `--output-lanes` | 1 | Number of aggregate pipelines downstream; these lanes rotate whole CMV aggregation intervals, not CRU subsets | | `--n-TFs-buffer` | 1 | Number of TFs buffered per group in the upstream `o2-tpc-cmv-flp` (must match that workflow's setting) | +| `--send-precise-timestamp` | false | Forward orbit-reset timing information needed by the aggregate workflow for precise CCDB validity timestamps | +| `--drop-data-after-nTFs` | 0 | Drop data for a relative TF slot after this many TFs have passed without receiving all CRUs; 0 uses the default derived from `--check-data-every-n` | +| `--check-data-every-n` | 0 | Check for missing CRU data every N invocations of the run function; -1 disables checking, 0 uses the default (timeframes/2) | +| `--nFactorTFs` | 1000 | Number of TFs to skip before flushing the oldest incomplete aggregation interval | + +#### `o2-tpc-cmv-aggregate` + +> **Important:** `--n-TFs-buffer` must be set to the same value as in `o2-tpc-cmv-distribute` and `o2-tpc-cmv-flp`. Mismatched values will silently corrupt the relTF mapping and TTree entry count. + +| Option | Default | Description | +|---|---|---| +| `--crus` | `0-359` | Full CRU range expected for each aggregate interval | +| `--timeframes` | 2000 | Number of TFs aggregated per calibration interval | +| `--input-lanes` | 1 | Number of aggregate pipelines; must match `o2-tpc-cmv-distribute --output-lanes` | +| `--n-TFs-buffer` | 1 | Number of real TFs packed into one CMV batch from upstream; **must match** `o2-tpc-cmv-distribute --n-TFs-buffer` | | `--enable-CCDB-output` | false | Forward the CMVContainer TTree as a CCDB object to `o2-calibration-ccdb-populator-workflow` | -| `--use-precise-timestamp` | false | Fetch orbit-reset and GRPECS from CCDB to compute a precise CCDB validity timestamp | -| `--dump-cmvs` | false | Write the CMVContainer TTree to a local ROOT file on disk | +| `--use-precise-timestamp` | false | Use orbit-reset timing forwarded by the distribute lane (requires `o2-tpc-cmv-distribute --send-precise-timestamp`) for precise CCDB validity start timestamps | +| `--output-dir` | `none` | Output directory for writing the CMVContainer ROOT file; must exist | +| `--nthreads` | 1 | Number of threads used for CMV preprocessing and compression; each thread processes a contiguous slice of buffered TFs | | `--use-sparse` | false | Sparse encoding: skip zero time bins (raw uint16 values; combine with `--use-compression-varint` or `--use-compression-huffman` for compressed sparse output) | | `--use-compression-varint` | false | Delta + zigzag + varint compression over all values; combined with `--use-sparse`: varint-encoded exact values at non-zero positions | | `--use-compression-huffman` | false | Huffman encoding over all values; combined with `--use-sparse`: Huffman-encoded exact values at non-zero positions | @@ -330,9 +348,6 @@ The CMV workflows parse raw TPC data, buffer Common Mode Values per CRU on FLPs, | `--cmv-round-integers-threshold` | 0 | Round values to nearest integer ADC for \|v\| ≤ N ADC before compression; 0 disables | | `--cmv-dynamic-precision-mean` | 1.0 | Gaussian centre in \|CMV\| (ADC) where the strongest fractional-bit trimming is applied | | `--cmv-dynamic-precision-sigma` | 0 | Gaussian width (ADC) for smooth CMV fractional-bit trimming; 0 disables | -| `--drop-data-after-nTFs` | 0 | Drop data for a relative TF slot after this many TFs have passed without receiving all CRUs; 0 uses the default derived from `--check-data-every-n` | -| `--check-data-every-n` | 0 | Check for missing CRU data every N invocations of the run function; -1 disables checking, 0 uses the default (timeframes/2) | -| `--nFactorTFs` | 1000 | Number of TFs to skip before flushing the oldest incomplete aggregation interval | ### Example 1 — Simple usage for testing @@ -361,7 +376,12 @@ o2-tpc-cmv-flp $ARGS_ALL \ --crus ${CRUS} | o2-tpc-cmv-distribute $ARGS_ALL \ --crus ${CRUS} \ - --dump-cmvs \ + --output-lanes 1 \ + --send-precise-timestamp \ +| +o2-tpc-cmv-aggregate $ARGS_ALL \ + --crus ${CRUS} \ + --output-dir ./ \ --enable-CCDB-output \ --cmv-zero-threshold 1.0 \ --cmv-dynamic-precision-mean 1.0 \ @@ -450,7 +470,12 @@ o2-dpl-raw-proxy $ARGS_ALL \ --dataspec "A:TPC/CMVGROUP;A:TPC/CMVORBITINFO" | o2-tpc-cmv-distribute $ARGS_ALL \ --crus ${CRUS} \ - --dump-cmvs \ + --output-lanes 1 \ + --send-precise-timestamp \ +| +o2-tpc-cmv-aggregate $ARGS_ALL \ + --crus ${CRUS} \ + --output-dir ./ \ --enable-CCDB-output \ --cmv-zero-threshold 1.0 \ --cmv-dynamic-precision-mean 1.0 \ @@ -461,4 +486,4 @@ o2-calibration-ccdb-populator-workflow $ARGS_ALL \ --ccdb-path ccdb-test.cern.ch:8080 ``` -The aggregator binds the ZeroMQ pull socket and waits for all FLPs to connect. Once `TPC/CMVGROUP` and `TPC/CMVORBITINFO` data arrive, `o2-tpc-cmv-distribute` merges them, applies compression, writes the object to the disk and uploads to the CCDB. +The aggregator binds the ZeroMQ pull socket and waits for all FLPs to connect. Once `TPC/CMVGROUP` and `TPC/CMVORBITINFO` data arrive, `o2-tpc-cmv-distribute` routes the grouped CMV batches, and `o2-tpc-cmv-aggregate` gathers the full CRU set for each interval, applies preprocessing and compression, writes the object to disk, and uploads it to the CCDB. diff --git a/Detectors/TPC/workflow/include/TPCWorkflow/TPCAggregateCMVSpec.h b/Detectors/TPC/workflow/include/TPCWorkflow/TPCAggregateCMVSpec.h new file mode 100644 index 0000000000000..7fecd4b1e295b --- /dev/null +++ b/Detectors/TPC/workflow/include/TPCWorkflow/TPCAggregateCMVSpec.h @@ -0,0 +1,637 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +/// @file TPCAggregateCMVSpec.h +/// @author Tuba Gündem, tuba.gundem@cern.ch +/// @brief TPC aggregation of distributed CMVs, including preprocessing, compression and CCDB output + +#ifndef O2_TPCAGGREGATECMVSPEC_H +#define O2_TPCAGGREGATECMVSPEC_H + +#include +#include +#include +#include +#include +#include +#include +#include "TMemFile.h" +#include "TParameter.h" +#include "Framework/Task.h" +#include "Framework/ControlService.h" +#include "Framework/Logger.h" +#include "Framework/DataProcessorSpec.h" +#include "Framework/InputRecordWalker.h" +#include "Framework/DataTakingContext.h" +#include "Framework/DataRefUtils.h" +#include "Headers/DataHeader.h" +#include "Framework/ConfigParamRegistry.h" +#include "CommonDataFormat/Pair.h" +#include "CCDB/CcdbApi.h" +#include "CCDB/CcdbObjectInfo.h" +#include "DetectorsCalibration/Utils.h" +#include "TPCWorkflow/TPCDistributeCMVSpec.h" +#include "TPCWorkflow/ProcessingHelpers.h" +#include "TPCCalibration/CMVContainer.h" +#include "DataFormatsTPC/CMV.h" +#include "DetectorsBase/GRPGeomHelper.h" +#include "MemoryResources/MemoryResources.h" +#include "CommonUtils/StringUtils.h" + +using namespace o2::framework; +using o2::header::gDataOriginTPC; + +namespace o2::tpc +{ + +class TPCAggregateCMVDevice : public o2::framework::Task +{ + public: + TPCAggregateCMVDevice(const int lane, + const std::vector& crus, + const unsigned int timeframes, + const bool sendCCDB, + const bool usePreciseTimestamp, + const int nTFsBuffer, + std::shared_ptr req) + : mLaneId{lane}, + mCRUs{crus}, + mTimeFrames{timeframes}, + mSendCCDB{sendCCDB}, + mUsePreciseTimestamp{usePreciseTimestamp}, + mNTFsBuffer{nTFsBuffer}, + mProcessedCRU(timeframes), + mProcessedCRUs(timeframes), + mRawCMVs(timeframes), + mOrbitInfo(timeframes), + mOrbitStep(timeframes), + mOrbitInfoSeen(timeframes, false), + mTFCompleted(timeframes, false), + mCCDBRequest(req) + { + std::sort(mCRUs.begin(), mCRUs.end()); + for (auto& crusMap : mProcessedCRUs) { + crusMap.reserve(mCRUs.size()); + for (const auto cruID : mCRUs) { + crusMap.emplace(cruID, false); + } + } + initIntervalTree(); + } + + void init(o2::framework::InitContext& ic) final + { + o2::base::GRPGeomHelper::instance().setRequest(mCCDBRequest); + mOutputDir = ic.options().get("output-dir"); + if ((mOutputDir != "none") && (mOutputDir != "/dev/null")) { + mOutputDir = o2::utils::Str::rectifyDirectory(mOutputDir); + } + mUseCompressionVarint = ic.options().get("use-compression-varint"); + mUseSparse = ic.options().get("use-sparse"); + mUseCompressionHuffman = ic.options().get("use-compression-huffman"); + mRoundIntegersThreshold = static_cast(ic.options().get("cmv-round-integers-threshold")); + mZeroThreshold = ic.options().get("cmv-zero-threshold"); + mDynamicPrecisionMean = ic.options().get("cmv-dynamic-precision-mean"); + mDynamicPrecisionSigma = ic.options().get("cmv-dynamic-precision-sigma"); + mThreads = std::max(1, ic.options().get("nthreads-compression")); + LOGP(info, "CMV aggregation settings: output-dir={}, use-compression-varint={}, use-sparse={}, use-compression-huffman={}, cmv-round-integers-threshold={}, cmv-zero-threshold={}, cmv-dynamic-precision-mean={}, cmv-dynamic-precision-sigma={}, nthreads-compression={}", + mOutputDir, mUseCompressionVarint, mUseSparse, mUseCompressionHuffman, mRoundIntegersThreshold, mZeroThreshold, mDynamicPrecisionMean, mDynamicPrecisionSigma, mThreads); + initIntervalTree(); + } + + void finaliseCCDB(ConcreteDataMatcher& matcher, void* obj) final + { + o2::base::GRPGeomHelper::instance().finaliseCCDB(matcher, obj); + } + + void run(o2::framework::ProcessingContext& pc) final + { + // Consume CCDB inputs; return early when they are the only valid inputs in this slot + int nCCDBInputs = 0; + if (pc.inputs().isValid("grpecs")) { + pc.inputs().get("grpecs"); + ++nCCDBInputs; + } + if (mUsePreciseTimestamp && pc.inputs().isValid("orbitreset")) { + mTFInfo = pc.inputs().get>("orbitreset"); + ++nCCDBInputs; + } + if (nCCDBInputs > 0 && pc.inputs().countValidInputs() == nCCDBInputs) { + return; + } + + const auto currTF = processing_helpers::getCurrentTF(pc); + + if (mTFFirst == -1) { + for (auto& ref : InputRecordWalker(pc.inputs(), mFirstTFFilter)) { + mTFFirst = pc.inputs().get(ref); + mIntervalFirstTF = mTFFirst; + mHasIntervalFirstTF = true; + break; + } + } + + // EOS sentinel forwarded by the distribute lane for partial batches (n-TFs-buffer > actual TFs delivered) + if (currTF == std::numeric_limits::max()) { + if (mTimestampStart == 0) { + mTimestampStart = pc.services().get().creation; + } + collectEOSInputs(pc); + return; + } + + if (mTFFirst == -1) { + mTFFirst = currTF; + mIntervalFirstTF = mTFFirst; + mHasIntervalFirstTF = true; + LOGP(warning, "firstTF not found. Setting {} as first TF for aggregate lane {}", mTFFirst, mLaneId); + } + + const long relTF = (currTF - mTFFirst) / mNTFsBuffer; + if ((relTF < 0) || (relTF >= static_cast(mTimeFrames))) { + LOGP(warning, "relTF={} out of range [0, {}) for TF {}, skipping", relTF, mTimeFrames, currTF); + return; + } + + // Capture orbit info first so setTimestampCCDB can use the measured stride + if (!mOrbitInfoSeen[relTF]) { + // all CRUs within a batch carry identical timing, so the first one is sufficient + for (auto& ref : InputRecordWalker(pc.inputs(), mOrbitFilter)) { + mOrbitInfo[relTF] = pc.inputs().get(ref); + const auto batchFirstOrbit = static_cast(mOrbitInfo[relTF] >> 32); + // TimingInfo.firstTForbit is the orbit of the last real TF in the batch (the TF that triggered the FLP to send). + // The FLP provides the orbit of the first real TF. Interpolating between the two gives the true stride, + // independent of the GRPECS/config nHBFPerTF value. + const auto batchLastOrbit = static_cast(pc.services().get().firstTForbit); + const auto defaultOrbitStep = static_cast(o2::base::GRPGeomHelper::instance().getNHBFPerTF()); + mOrbitStep[relTF] = (mNTFsBuffer > 1 && batchLastOrbit > batchFirstOrbit) ? (batchLastOrbit - batchFirstOrbit) / static_cast(mNTFsBuffer - 1) : defaultOrbitStep; + mLastOrbitStep = mOrbitStep[relTF]; + mOrbitInfoSeen[relTF] = true; + break; + } + } + + if (mTimestampStart == 0) { + setTimestampCCDB(relTF, mOrbitStep[relTF], pc); + } + + for (auto& ref : InputRecordWalker(pc.inputs(), mFilter)) { + auto const* hdr = DataRefUtils::getHeader(ref); + const unsigned int cru = hdr->subSpecification; + if (!(std::binary_search(mCRUs.begin(), mCRUs.end(), cru))) { + LOGP(debug, "Received CMV data from CRU {} which is not part of this aggregate lane", cru); + continue; + } + if (mProcessedCRUs[relTF][cru]) { + continue; + } + + auto cmvVec = pc.inputs().get>(ref); + mRawCMVs[relTF][cru] = std::vector(cmvVec.begin(), cmvVec.end()); + mProcessedCRUs[relTF][cru] = true; + ++mProcessedCRU[relTF]; + } + + if (mProcessedCRU[relTF] == mCRUs.size() && !mTFCompleted[relTF]) { + mTFCompleted[relTF] = true; + ++mProcessedTFs; + mLastSeenTF = currTF; + } + + if (mProcessedTFs == mTimeFrames) { + materializeBufferedTFs(false); + sendOutput(pc.outputs()); + reset(); + } + } + + void endOfStream(o2::framework::EndOfStreamContext& ec) final + { + materializeBufferedTFs(true); + materializeEOSBuffer(); + sendOutput(ec.outputs()); + ec.services().get().readyToQuit(QuitRequest::Me); + } + + static constexpr header::DataDescription getDataDescriptionCCDBCMV() { return header::DataDescription{"TPC_CMV"}; } + + private: + struct PreparedTF { + CMVPerTF tf{}; + CMVPerTFCompressed compressed{}; + }; + + const int mLaneId{0}; ///< aggregate lane index (matches the distribute output lane) + std::vector mCRUs{}; ///< CRUs expected on this lane (sorted for binary_search) + const unsigned int mTimeFrames{}; ///< number of CMV batches per calibration interval (= total TFs / nTFsBuffer) + const bool mSendCCDB{false}; ///< send serialised TTree to the CCDB populator + const bool mUsePreciseTimestamp{false}; ///< use orbit-reset info forwarded by the distribute lane for precise CCDB timestamps + const int mNTFsBuffer{1}; ///< number of real TFs packed into one CMV batch (must match TPCFLPCMVSpec) + std::string mOutputDir{"none"}; ///< directory to write local ROOT files ("none" or "/dev/null" to disable) + bool mUseCompressionVarint{false}; ///< delta+zigzag+varint compression for all values (dense path); combined with mUseSparse → sparse+varint + bool mUseSparse{false}; ///< sparse encoding (skip zero time bins); alone = raw uint16; combined with varint/Huffman → sparse+compressed + bool mUseCompressionHuffman{false}; ///< Huffman encoding; combined with mUseSparse → sparse+Huffman + uint16_t mRoundIntegersThreshold{0}; ///< round values to nearest integer ADC for |v| <= N ADC before compression; 0 = disabled + float mZeroThreshold{0.f}; ///< zero out CMV values whose float magnitude is below this threshold; 0 = disabled + float mDynamicPrecisionMean{1.f}; ///< Gaussian centre in |CMV| ADC where the strongest fractional-bit trimming is applied + float mDynamicPrecisionSigma{0.f}; ///< Gaussian width in ADC for fractional-bit trimming; 0 disables + int mThreads{1}; ///< number of threads for CMV preprocessing and compression in appendBatchToTree() + long mTFFirst{-1}; ///< absolute TF index of the first real TF in the current interval (-1 = not yet received) + long mTimestampStart{0}; ///< CCDB validity start timestamp in ms (0 until set by setTimestampCCDB) + long mIntervalFirstTF{0}; ///< absolute TF counter stored in the TTree UserInfo as "firstTF" + bool mHasIntervalFirstTF{false}; ///< true once mIntervalFirstTF has been set for the current interval + unsigned int mProcessedTFs{0}; ///< number of completed CMV batches in the current interval + std::vector mProcessedCRU{}; ///< counter of received CRUs per relTF slot; triggers completion when it reaches mCRUs.size() + std::vector> mProcessedCRUs{}; ///< per-CRU received flag per relTF ([relTF][CRU]); prevents double-counting on retransmission + std::vector>> mRawCMVs{}; ///< buffered raw CMV data per (relTF, CRU); unpacked in appendBatchToTree() + std::vector mOrbitInfo{}; ///< packed (firstOrbit << 32 | firstBC) per relTF, forwarded by the distribute lane + std::vector mOrbitStep{}; ///< per-sub-TF orbit stride per relTF; derived from actual batch timing + std::vector mOrbitInfoSeen{}; ///< true once orbit/BC has been captured for each relTF slot + std::vector mTFCompleted{}; ///< true once all CRUs have been received for a given relTF slot + std::unordered_map> mEOSRawCMVs{}; ///< CMV data received during the EOS sentinel path (partial batch at end of run) + uint32_t mEOSFirstOrbit{0}; ///< firstOrbit captured from the FLP's EOS partial-buffer flush + uint16_t mEOSFirstBC{0}; ///< firstBC captured from the FLP's EOS partial-buffer flush + uint32_t mLastOrbitStep{0}; ///< cached orbit stride from the last complete batch; fallback for the EOS partial batch + uint32_t mLastSeenTF{0}; ///< last TF counter seen in run(); used to compute lastTF metadata in the TTree + unsigned int mIntervalTFCount{0}; ///< number of TTree entries filled for the current interval + dataformats::Pair mTFInfo{}; ///< orbit-reset time (ms) and NHBFPerTF forwarded by distribute lane 0 for precise timestamps + std::shared_ptr mCCDBRequest; ///< GRPECS request so GRPGeomHelper::getNHBFPerTF() is valid in this process + std::unique_ptr mIntervalTree{}; ///< in-memory TTree accumulating one entry per real TF; serialised to CCDB/disk at interval end + CMVPerTF mCurrentTF{}; ///< staging object written to the TTree branch for the uncompressed path + CMVPerTFCompressed mCurrentCompressedTF{}; ///< staging object written to the TTree branch when any compression flags are set + const std::vector mFilter{ + {"cmvagg", + ConcreteDataTypeMatcher{gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMV(mLaneId)}, + Lifetime::Sporadic}}; + const std::vector mOrbitFilter{ + {"cmvorbit", + ConcreteDataMatcher{gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVOrbitInfo(mLaneId), header::DataHeader::SubSpecificationType{static_cast(mLaneId)}}, + Lifetime::Sporadic}}; + const std::vector mFirstTFFilter{ + {"firstTF", + ConcreteDataMatcher{gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVFirstTF(), header::DataHeader::SubSpecificationType{static_cast(mLaneId)}}, + Lifetime::Sporadic}}; + + uint8_t buildCompressionFlags() const + { + uint8_t flags = CMVEncoding::kNone; + if (mUseSparse) { + flags |= CMVEncoding::kSparse; + } + if (mUseCompressionHuffman) { + flags |= CMVEncoding::kDelta | CMVEncoding::kZigzag | CMVEncoding::kHuffman; + } else if (mUseCompressionVarint) { + flags |= CMVEncoding::kDelta | CMVEncoding::kZigzag | CMVEncoding::kVarint; + } + return flags; + } + + /// Create a fresh in-memory TTree for the next aggregation interval + /// Uses a single CMVPerTFCompressed branch whenever any compression is active or a raw CMVPerTF branch when no compression flags are set. + void initIntervalTree() + { + mIntervalTree = std::make_unique("ccdb_object", "ccdb_object"); + mIntervalTree->SetAutoSave(0); + mIntervalTree->SetDirectory(nullptr); + if (buildCompressionFlags() != CMVEncoding::kNone) { + mIntervalTree->Branch("CMVPerTFCompressed", &mCurrentCompressedTF); + } else { + mIntervalTree->Branch("CMVPerTF", &mCurrentTF); + } + } + + /// Accumulate CMV data from the EOS sentinel (TF == UINT32_MAX), i.e. a partial batch forwarded by the distribute lane when n-TFs-buffer > number of TFs actually delivered + /// Orbit/BC is captured once; raw data is appended per CRU into mEOSRawCMVs + void collectEOSInputs(o2::framework::ProcessingContext& pc) + { + if (mEOSFirstOrbit == 0) { + for (auto& ref : InputRecordWalker(pc.inputs(), mOrbitFilter)) { + const auto orbitBC = pc.inputs().get(ref); + mEOSFirstOrbit = static_cast(orbitBC >> 32); + mEOSFirstBC = static_cast(orbitBC & 0xFFFFu); + break; + } + } + + for (auto& ref : InputRecordWalker(pc.inputs(), mFilter)) { + auto const* hdr = DataRefUtils::getHeader(ref); + const unsigned int cru = hdr->subSpecification; + if (!(std::binary_search(mCRUs.begin(), mCRUs.end(), cru))) { + continue; + } + auto cmvVec = pc.inputs().get>(ref); + auto& buffer = mEOSRawCMVs[cru]; + buffer.insert(buffer.end(), cmvVec.begin(), cmvVec.end()); + } + } + + /// Set the CCDB validity start timestamp + /// When using precise timestamps, back-calculates the orbit-reset-referenced wall-clock time for the first real TF in the interval using the orbit-reset time forwarded by distribute lane 0. + /// orbitStep is the dynamically measured per-sub-TF stride; when non-zero it is preferred over the GRP NHBFPerTF for the orbit-offset calculation. + void setTimestampCCDB(const long relTF, const uint32_t orbitStep, o2::framework::ProcessingContext& pc) + { + if (mUsePreciseTimestamp && !mTFInfo.second) { + return; + } + const auto& tinfo = pc.services().get(); + // prefer the measured stride; fall back to NHBFPerTF from GRPECS + const int nHBFPerTF = (orbitStep > 0) ? static_cast(orbitStep) : o2::base::GRPGeomHelper::instance().getNHBFPerTF(); + const auto nOrbitsOffset = (relTF * mNTFsBuffer + (mNTFsBuffer - 1)) * nHBFPerTF; + mTimestampStart = mUsePreciseTimestamp ? (mTFInfo.first + (tinfo.firstTForbit - nOrbitsOffset) * o2::constants::lhc::LHCOrbitMUS * 0.001) : tinfo.creation; + LOGP(detail, "Setting time stamp reset reference to: {}, at tfCounter: {}, firstTForbit: {}, NHBFPerTF: {}, relTF: {}, nOrbitsOffset: {}", + mTFInfo.first, tinfo.tfCounter, tinfo.firstTForbit, nHBFPerTF, relTF, nOrbitsOffset); + } + + /// Unpack and fill the TTree for all relTF slots that have been buffered during run(). + /// When includeIncomplete=false (normal interval end) only fully-received batches are filled. + /// When includeIncomplete=true (EOS flush) partial batches are also flushed with a warning. + void materializeBufferedTFs(const bool includeIncomplete) + { + for (unsigned int relTF = 0; relTF < mTimeFrames; ++relTF) { + if (mProcessedCRU[relTF] == 0) { + continue; + } + + if ((mProcessedCRU[relTF] != mCRUs.size()) && !includeIncomplete) { + continue; + } + + if ((mProcessedCRU[relTF] != mCRUs.size()) && includeIncomplete) { + LOGP(warning, "Aggregate lane {} flushing incomplete CMV batch relTF {} at EOS: received {} CRUs out of {}", mLaneId, relTF, mProcessedCRU[relTF], mCRUs.size()); + } + + if (!mHasIntervalFirstTF) { + mIntervalFirstTF = mTFFirst == -1 ? 0 : mTFFirst; + mHasIntervalFirstTF = true; + } + + // derive the actual number of sub-TFs from the buffer size; fall back to mNTFsBuffer if empty + const auto maxBufferSize = getMaxBufferSize(mRawCMVs[relTF]); + const int nTFsInBatch = maxBufferSize ? std::max(1, static_cast(maxBufferSize / cmv::NTimeBinsPerTF)) : mNTFsBuffer; + // fall back to GRP NHBFPerTF only if no orbit stride was measured for this relTF + const auto orbitStep = mOrbitStep[relTF] ? mOrbitStep[relTF] : static_cast(o2::base::GRPGeomHelper::instance().getNHBFPerTF()); + appendBatchToTree(mRawCMVs[relTF], mOrbitInfo[relTF], orbitStep, nTFsInBatch); + } + } + + /// Unpack and fill the TTree from the EOS partial-batch buffer (mEOSRawCMVs). + /// The number of real TFs is inferred from the raw buffer size divided by NTimeBinsPerTF. + /// Uses mLastOrbitStep from the last complete batch as the orbit stride fallback. + void materializeEOSBuffer() + { + if (mEOSRawCMVs.empty()) { + return; + } + + const auto maxBufferSize = getMaxBufferSize(mEOSRawCMVs); + const int nTFsInBatch = static_cast(maxBufferSize / cmv::NTimeBinsPerTF); + if (nTFsInBatch <= 0) { + return; + } + + if (!mHasIntervalFirstTF) { + mIntervalFirstTF = mLastSeenTF + 1; + mHasIntervalFirstTF = true; + } + + const uint64_t orbitInfo = (static_cast(mEOSFirstOrbit) << 32) | static_cast(mEOSFirstBC); + // use the actual stride seen in run(); fall back to GRP only if no complete batch was seen + const auto orbitStep = mLastOrbitStep ? mLastOrbitStep : static_cast(o2::base::GRPGeomHelper::instance().getNHBFPerTF()); + appendBatchToTree(mEOSRawCMVs, orbitInfo, orbitStep, nTFsInBatch); + mLastSeenTF += static_cast(nTFsInBatch); + } + + static size_t getMaxBufferSize(const std::unordered_map>& rawCMVs) + { + size_t maxBufferSize = 0; + for (const auto& [cru, values] : rawCMVs) { + maxBufferSize = std::max(maxBufferSize, values.size()); + } + return maxBufferSize; + } + + /// Unpack nTFsInBatch real TFs from rawCMVs, apply preprocessing (rounding, zeroing, trimming), + /// optionally compress them, and fill one TTree entry per real TF. + /// Processing is parallelised across nThreads workers using std::thread (each thread owns a disjoint chunk). + void appendBatchToTree(const std::unordered_map>& rawCMVs, const uint64_t orbitInfo, const uint32_t orbitStep, const int nTFsInBatch) + { + if (nTFsInBatch <= 0) { + return; + } + + const auto firstOrbit = static_cast(orbitInfo >> 32); + const auto firstBC = static_cast(orbitInfo & 0xFFFFu); + const uint8_t flags = buildCompressionFlags(); + std::vector prepared(nTFsInBatch); + const int nThreads = std::max(1, std::min(mThreads, nTFsInBatch)); + const int chunkSize = (nTFsInBatch + nThreads - 1) / nThreads; + + auto worker = [&](const int iThread) { + const int beginTF = iThread * chunkSize; + const int endTF = std::min(nTFsInBatch, beginTF + chunkSize); + for (int tfIndex = beginTF; tfIndex < endTF; ++tfIndex) { + + auto& preparedTF = prepared[tfIndex]; + preparedTF.tf.firstOrbit = firstOrbit + static_cast(tfIndex) * orbitStep; + preparedTF.tf.firstBC = (tfIndex == 0) ? firstBC : 0; + + for (const auto& [cru, values] : rawCMVs) { + const uint32_t offset = static_cast(tfIndex) * cmv::NTimeBinsPerTF; + if (offset >= static_cast(values.size())) { + continue; + } + const uint32_t nBins = std::min(static_cast(values.size()) - offset, cmv::NTimeBinsPerTF); + for (uint32_t tb = 0; tb < nBins; ++tb) { + preparedTF.tf.mDataPerTF[cru * cmv::NTimeBinsPerTF + tb] = values[offset + tb]; + } + } + + preparedTF.tf.roundToIntegers(mRoundIntegersThreshold); + if (mZeroThreshold > 0.f) { + preparedTF.tf.zeroSmallValues(mZeroThreshold); + } + if (mDynamicPrecisionSigma > 0.f) { + preparedTF.tf.trimGaussianPrecision(mDynamicPrecisionMean, mDynamicPrecisionSigma); + } + if (flags != CMVEncoding::kNone) { + preparedTF.compressed = preparedTF.tf.compress(flags); + } + } + }; + + std::vector workers; + workers.reserve(nThreads - 1); + for (int iThread = 1; iThread < nThreads; ++iThread) { + workers.emplace_back(worker, iThread); + } + worker(0); + for (auto& thread : workers) { + thread.join(); + } + + for (int tfIndex = 0; tfIndex < nTFsInBatch; ++tfIndex) { + if (flags != CMVEncoding::kNone) { + mCurrentCompressedTF = std::move(prepared[tfIndex].compressed); + } else { + mCurrentTF = std::move(prepared[tfIndex].tf); + } + mIntervalTree->Fill(); + ++mIntervalTFCount; + } + } + + void sendOutput(DataAllocator& output) + { + using timer = std::chrono::high_resolution_clock; + + if (mIntervalTFCount == 0) { + LOGP(warning, "CMV interval is empty at sendOutput for lane {}, skipping", mLaneId); + return; + } + + const auto lastTF = mIntervalFirstTF + static_cast(mIntervalTFCount) - 1; + mIntervalTree->GetUserInfo()->Clear(); + mIntervalTree->GetUserInfo()->Add(new TParameter("firstTF", mIntervalFirstTF)); + mIntervalTree->GetUserInfo()->Add(new TParameter("lastTF", lastTF)); + + LOGP(detail, "CMVPerTF TTree lane {}: {} entries, firstTF={}, lastTF={}", mLaneId, mIntervalTFCount, mIntervalFirstTF, lastTF); + auto start = timer::now(); + + const bool writeToDisk = (mOutputDir != "none") && (mOutputDir != "/dev/null"); + if (writeToDisk) { + const std::string fname = fmt::format("{}CMV_timestamp{}.root", mOutputDir, mTimestampStart); + try { + CMVPerTF::writeToFile(fname, mIntervalTree); + LOGP(detail, "CMV file written to {}", fname); + } catch (const std::exception& e) { + LOGP(error, "Failed to write CMV file {}: {}", fname, e.what()); + } + } + + if (!mSendCCDB) { + if (!writeToDisk) { + LOGP(warning, "Neither CCDB output nor output-dir is enabled for aggregate lane {}, skipping CMV export", mLaneId); + } + return; + } + + // use the actual number of TFs (mIntervalTFCount) so the CCDB validity end is correct for partial last intervals + const int nHBFPerTF = o2::base::GRPGeomHelper::instance().getNHBFPerTF(); + const long timeStampEnd = mTimestampStart + static_cast(mIntervalTFCount * nHBFPerTF * o2::constants::lhc::LHCOrbitMUS * 1e-3); + if (timeStampEnd <= mTimestampStart) { + LOGP(warning, "Invalid CCDB timestamp range start:{} end:{}, skipping upload", mTimestampStart, timeStampEnd); + return; + } + + o2::ccdb::CcdbObjectInfo ccdbInfoCMV("TPC/Calib/CMV", "TTree", "CMV.root", {}, mTimestampStart, timeStampEnd); + auto image = o2::ccdb::CcdbApi::createObjectImage((mIntervalTree.get()), &ccdbInfoCMV); + // trim TMemFile zero-padding: GetSize() is block-rounded, GetEND() is the actual file end + { + TMemFile mf("trim", image->data(), static_cast(image->size()), "READ"); + image->resize(static_cast(mf.GetEND())); + mf.Close(); + } + + LOGP(detail, "Sending object {} / {} of size {} bytes, valid for {} : {}", ccdbInfoCMV.getPath(), ccdbInfoCMV.getFileName(), image->size(), ccdbInfoCMV.getStartValidityTimestamp(), ccdbInfoCMV.getEndValidityTimestamp()); + output.snapshot(Output{o2::calibration::Utils::gDataOriginCDBPayload, getDataDescriptionCCDBCMV(), 0}, *image); + output.snapshot(Output{o2::calibration::Utils::gDataOriginCDBWrapper, getDataDescriptionCCDBCMV(), 0}, ccdbInfoCMV); + + auto stop = timer::now(); + std::chrono::duration elapsed = stop - start; + LOGP(detail, "CMV CCDB serialisation time: {:.3f} s", elapsed.count()); + } + + /// Reset all per-interval state after a successful sendOutput(); prepares for the next interval + void reset() + { + mTFFirst = -1; + mTimestampStart = 0; + mIntervalFirstTF = 0; + mHasIntervalFirstTF = false; + mProcessedTFs = 0; + std::fill(mProcessedCRU.begin(), mProcessedCRU.end(), 0); + std::fill(mOrbitInfo.begin(), mOrbitInfo.end(), 0); + std::fill(mOrbitStep.begin(), mOrbitStep.end(), 0); + std::fill(mOrbitInfoSeen.begin(), mOrbitInfoSeen.end(), false); + std::fill(mTFCompleted.begin(), mTFCompleted.end(), false); + for (auto& processedMap : mProcessedCRUs) { + for (auto& [cru, seen] : processedMap) { + seen = false; + } + } + for (auto& rawPerTF : mRawCMVs) { + rawPerTF.clear(); + } + mEOSRawCMVs.clear(); + mEOSFirstOrbit = 0; + mEOSFirstBC = 0; + mLastOrbitStep = 0; + mLastSeenTF = 0; + mIntervalTFCount = 0; + mCurrentTF = CMVPerTF{}; + mCurrentCompressedTF = CMVPerTFCompressed{}; + initIntervalTree(); + } +}; + +/// Build a DataProcessorSpec for one aggregate lane +/// Each lane receives CMV data from one distribute output lane (matched by lane index) and expects the full CRU list — the distribute stage already routes per-CRU data to the correct lane +inline DataProcessorSpec getTPCAggregateCMVSpec(const int lane, + const std::vector& crus, + const unsigned int timeframes, + const bool sendCCDB, + const bool usePreciseTimestamp, + const int nTFsBuffer = 1) +{ + std::vector outputSpecs; + if (sendCCDB) { + outputSpecs.emplace_back(ConcreteDataTypeMatcher{o2::calibration::Utils::gDataOriginCDBPayload, TPCAggregateCMVDevice::getDataDescriptionCCDBCMV()}, Lifetime::Sporadic); + outputSpecs.emplace_back(ConcreteDataTypeMatcher{o2::calibration::Utils::gDataOriginCDBWrapper, TPCAggregateCMVDevice::getDataDescriptionCCDBCMV()}, Lifetime::Sporadic); + } + + std::vector inputSpecs; + inputSpecs.emplace_back(InputSpec{"cmvagg", ConcreteDataTypeMatcher{gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMV(lane)}, Lifetime::Sporadic}); + inputSpecs.emplace_back(InputSpec{"cmvorbit", gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVOrbitInfo(lane), header::DataHeader::SubSpecificationType{static_cast(lane)}, Lifetime::Sporadic}); + inputSpecs.emplace_back(InputSpec{"firstTF", gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVFirstTF(), header::DataHeader::SubSpecificationType{static_cast(lane)}, Lifetime::Sporadic}); + if (usePreciseTimestamp) { + inputSpecs.emplace_back(InputSpec{"orbitreset", gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVOrbitReset(), header::DataHeader::SubSpecificationType{static_cast(lane)}, Lifetime::Sporadic}); + } + + // Request GRPECS from CCDB so that GRPGeomHelper::getNHBFPerTF() is valid in this (separate) process + auto ccdbRequest = std::make_shared(false, // orbitResetTime + true, // GRPECS (NHBFPerTF) + false, // GRPLHCIF + false, // GRPMagField + false, // askMatLUT + o2::base::GRPGeomRequest::None, // geometry + inputSpecs); + + DataProcessorSpec spec{ + fmt::format("tpc-aggregate-cmv-{:02}", lane).data(), + inputSpecs, + outputSpecs, + AlgorithmSpec{adaptFromTask(lane, crus, timeframes, sendCCDB, usePreciseTimestamp, nTFsBuffer, ccdbRequest)}, + Options{{"output-dir", VariantType::String, "none", {"CMV output directory, must exist"}}, + {"nthreads-compression", VariantType::Int, 1, {"Number of threads used for CMV per timeframe preprocessing and compression"}}, + {"use-sparse", VariantType::Bool, false, {"Sparse encoding (skip zero time bins). Alone: raw uint16 values. With --use-compression-varint: varint exact values. With --use-compression-huffman: Huffman exact values"}}, + {"use-compression-varint", VariantType::Bool, false, {"Delta+zigzag+varint compression (all values). Combined with --use-sparse: sparse positions + varint encoded exact CMV values"}}, + {"use-compression-huffman", VariantType::Bool, false, {"Huffman encoding. Combined with --use-sparse: sparse positions + Huffman-encoded exact CMV values"}}, + {"cmv-zero-threshold", VariantType::Float, 0.f, {"Zero out CMV values whose float magnitude is below this threshold after optional integer rounding and before compression; 0 disables"}}, + {"cmv-round-integers-threshold", VariantType::Int, 0, {"Round values to nearest integer ADC for |v| <= N ADC before compression; 0 disables"}}, + {"cmv-dynamic-precision-mean", VariantType::Float, 1.f, {"Gaussian centre in |CMV| ADC where the strongest fractional bit trimming is applied"}}, + {"cmv-dynamic-precision-sigma", VariantType::Float, 0.f, {"Gaussian width in ADC for smooth CMV fractional bit trimming; 0 disables"}}}}; + spec.rank = lane; + return spec; +} + +} // namespace o2::tpc + +#endif diff --git a/Detectors/TPC/workflow/include/TPCWorkflow/TPCDistributeCMVSpec.h b/Detectors/TPC/workflow/include/TPCWorkflow/TPCDistributeCMVSpec.h index c1744ce86d3ac..2a351ad91bf05 100644 --- a/Detectors/TPC/workflow/include/TPCWorkflow/TPCDistributeCMVSpec.h +++ b/Detectors/TPC/workflow/include/TPCWorkflow/TPCDistributeCMVSpec.h @@ -11,15 +11,17 @@ /// @file TPCDistributeCMVSpec.h /// @author Tuba Gündem, tuba.gundem@cern.ch -/// @brief TPC aggregation of grouped CMVs +/// @brief TPC distribution of grouped CMVs towards the CMV aggregation workflow #ifndef O2_TPCDISTRIBUTECMVSPEC_H #define O2_TPCDISTRIBUTECMVSPEC_H +#include +#include +#include +#include #include -#include #include -#include "TParameter.h" #include "Framework/Task.h" #include "Framework/ControlService.h" #include "Framework/Logger.h" @@ -33,12 +35,6 @@ #include "TPCWorkflow/ProcessingHelpers.h" #include "DetectorsBase/GRPGeomHelper.h" #include "CommonDataFormat/Pair.h" -#include "TMemFile.h" -#include "CCDB/CcdbApi.h" -#include "CCDB/CcdbObjectInfo.h" -#include "DetectorsCalibration/Utils.h" -#include "TPCCalibration/CMVContainer.h" -#include "DataFormatsTPC/CMV.h" using namespace o2::framework; using o2::header::gDataOriginTPC; @@ -50,20 +46,25 @@ namespace o2::tpc class TPCDistributeCMVSpec : public o2::framework::Task { public: - TPCDistributeCMVSpec(const std::vector& crus, const unsigned int timeframes, const int nTFsBuffer, const int firstTF, const bool sendCCDB, const bool usePreciseTimestamp, std::shared_ptr req) + TPCDistributeCMVSpec(const std::vector& crus, const unsigned int timeframes, const int nTFsBuffer, const unsigned int outlanes, const int firstTF, std::shared_ptr req) : mCRUs{crus}, mTimeFrames{timeframes}, mNTFsBuffer{nTFsBuffer}, + mOutLanes{outlanes}, mProcessedCRU{{std::vector(timeframes), std::vector(timeframes)}}, - mTFStart{{firstTF, firstTF + timeframes}}, - mTFEnd{{firstTF + timeframes - 1, mTFStart[1] + timeframes - 1}}, + mTFStart{{firstTF, firstTF + static_cast(timeframes) * nTFsBuffer}}, + mTFEnd{{firstTF + static_cast(timeframes) * nTFsBuffer - 1, firstTF + 2LL * timeframes * nTFsBuffer - 1}}, mCCDBRequest(req), - mSendCCDB{sendCCDB}, - mUsePreciseTimestamp{usePreciseTimestamp}, - mSendCCDBOutputOrbitReset(1), - mSendCCDBOutputGRPECS(1), + mSendCCDBOutputOrbitReset(outlanes), + mSendCCDBOutputGRPECS(outlanes), mOrbitInfoForwarded{{std::vector(timeframes, false), std::vector(timeframes, false)}} { + mDataDescrOut.reserve(mOutLanes); + mOrbitDescrOut.reserve(mOutLanes); + for (unsigned int i = 0; i < mOutLanes; ++i) { + mDataDescrOut.emplace_back(getDataDescriptionCMV(i)); + mOrbitDescrOut.emplace_back(getDataDescriptionCMVOrbitInfo(i)); + } // sort vector for binary_search std::sort(mCRUs.begin(), mCRUs.end()); @@ -79,10 +80,7 @@ class TPCDistributeCMVSpec : public o2::framework::Task mFilter.emplace_back(InputSpec{"cmvsgroup", ConcreteDataTypeMatcher{gDataOriginTPC, TPCFLPCMVDevice::getDataDescriptionCMVGroup()}, Lifetime::Sporadic}); mOrbitFilter.emplace_back(InputSpec{"cmvorbit", ConcreteDataTypeMatcher{gDataOriginTPC, TPCFLPCMVDevice::getDataDescriptionCMVOrbitInfo()}, Lifetime::Sporadic}); - - // pre-allocate the accumulator TTree for the current aggregation interval - initIntervalTree(); - }; + } void init(o2::framework::InitContext& ic) final { @@ -97,33 +95,21 @@ class TPCDistributeCMVSpec : public o2::framework::Task } mNTFsDataDrop = mCheckEveryNData; } - mDumpCMVs = ic.options().get("dump-cmvs"); - mUseCompressionVarint = ic.options().get("use-compression-varint"); - mUseSparse = ic.options().get("use-sparse"); - mUseCompressionHuffman = ic.options().get("use-compression-huffman"); - mRoundIntegersThreshold = static_cast(ic.options().get("cmv-round-integers-threshold")); - mZeroThreshold = ic.options().get("cmv-zero-threshold"); - mDynamicPrecisionMean = ic.options().get("cmv-dynamic-precision-mean"); - mDynamicPrecisionSigma = ic.options().get("cmv-dynamic-precision-sigma"); - LOGP(info, "CMV compression settings: use-compression-varint={}, use-sparse={}, use-compression-huffman={}, cmv-round-integers-threshold={}, cmv-zero-threshold={}, cmv-dynamic-precision-mean={}, cmv-dynamic-precision-sigma={}", - mUseCompressionVarint, mUseSparse, mUseCompressionHuffman, mRoundIntegersThreshold, mZeroThreshold, mDynamicPrecisionMean, mDynamicPrecisionSigma); - // re-initialise the interval tree now that compression options are known (constructor used the defaults) - initIntervalTree(); } void finaliseCCDB(ConcreteDataMatcher& matcher, void* obj) final { o2::base::GRPGeomHelper::instance().finaliseCCDB(matcher, obj); if (matcher == ConcreteDataMatcher("CTP", "ORBITRESET", 0)) { - LOGP(info, "Updating ORBITRESET"); + LOGP(debug, "Updating ORBITRESET"); std::fill(mSendCCDBOutputOrbitReset.begin(), mSendCCDBOutputOrbitReset.end(), true); } else if (matcher == ConcreteDataMatcher("GLO", "GRPECS", 0)) { // check if received object is valid if (o2::base::GRPGeomHelper::instance().getGRPECS()->getRun() != 0) { - LOGP(info, "Updating GRPECS"); + LOGP(debug, "Updating GRPECS"); std::fill(mSendCCDBOutputGRPECS.begin(), mSendCCDBOutputGRPECS.end(), true); } else { - LOGP(info, "Detected default GRPECS object"); + LOGP(debug, "Detected default GRPECS object"); } } } @@ -143,44 +129,43 @@ class TPCDistributeCMVSpec : public o2::framework::Task if (pc.inputs().countValidInputs() == (grpecsValid + orbitResetValid)) { return; } - // update mTFInfo from GRPGeomHelper whenever orbit-reset or GRPECS objects are fresh - if (mSendCCDBOutputOrbitReset[0] && mSendCCDBOutputGRPECS[0]) { - mSendCCDBOutputOrbitReset[0] = false; - mSendCCDBOutputGRPECS[0] = false; - mTFInfo = dataformats::Pair{o2::base::GRPGeomHelper::instance().getOrbitResetTimeMS(), o2::base::GRPGeomHelper::instance().getNHBFPerTF()}; - } } const auto tf = processing_helpers::getCurrentTF(pc); - mLastSeenTF = tf; // track for endOfStream flush + if (tf == std::numeric_limits::max()) { + forwardEOSData(pc); + return; + } // automatically detect firstTF in case firstTF was not specified if (mTFStart.front() <= -1) { - const auto firstTF = tf; + const auto firstTFDetected = tf; const long offsetTF = std::abs(mTFStart.front() + 1); const auto nTotTFs = getNRealTFs(); - mTFStart = {firstTF + offsetTF, firstTF + offsetTF + nTotTFs}; + // tf is the batch TF counter (= last real TF in the first batch), subtract (mNTFsBuffer - 1) to recover the actual first real TF of the interval + const long firstRealTF = static_cast(firstTFDetected) - (mNTFsBuffer - 1) + offsetTF; + mTFStart = {firstRealTF, firstRealTF + nTotTFs}; mTFEnd = {mTFStart[1] - 1, mTFStart[1] - 1 + nTotTFs}; - LOGP(info, "Setting {} as first TF", mTFStart[0]); - LOGP(info, "Using offset of {} TFs for setting the first TF", offsetTF); + LOGP(detail, "Setting {} as first TF", mTFStart[0]); + LOGP(detail, "Using offset of {} TFs for setting the first TF", offsetTF); } // check which buffer to use for current incoming data const bool currentBuffer = (tf > mTFEnd[mBuffer]) ? !mBuffer : mBuffer; if (mTFStart[currentBuffer] > tf) { - LOGP(info, "All CRUs for current TF {} already received. Skipping this TF", tf); + LOGP(detail, "All CRUs for current TF {} already received. Skipping this TF", tf); return; } + const unsigned int currentOutLane = getOutLane(tf); const unsigned int relTF = (tf - mTFStart[currentBuffer]) / mNTFsBuffer; - LOGP(info, "Current TF: {}, relative TF: {}, current buffer: {}, mTFStart: {}", tf, relTF, currentBuffer, mTFStart[currentBuffer]); + LOGP(debug, "Current TF: {}, relative TF: {}, current buffer: {}, current output lane: {}, mTFStart: {}", tf, relTF, currentBuffer, currentOutLane, mTFStart[currentBuffer]); if (relTF >= mProcessedCRU[currentBuffer].size()) { LOGP(warning, "Skipping tf {}: relative tf {} is larger than size of buffer: {}", tf, relTF, mProcessedCRU[currentBuffer].size()); - // check number of processed CRUs for previous TFs. If CRUs are missing for them, they are probably lost/not received mProcessedTotalData = mCheckEveryNData; - checkIntervalsForMissingData(pc, currentBuffer, relTF, tf); + checkIntervalsForMissingData(pc, currentBuffer, relTF, currentOutLane, tf); return; } @@ -188,33 +173,18 @@ class TPCDistributeCMVSpec : public o2::framework::Task return; } - // record the absolute first TF of this aggregation interval - if (mIntervalTFCount == 0) { - mIntervalFirstTF = tf; + if (mSendOutputStartInfo[currentBuffer]) { + mSendOutputStartInfo[currentBuffer] = false; + pc.outputs().snapshot(Output{gDataOriginTPC, getDataDescriptionCMVFirstTF(), header::DataHeader::SubSpecificationType{currentOutLane}}, mTFStart[currentBuffer]); } - // set CCDB start timestamp once at the start of each aggregation interval - if (mTimestampStart == 0) { - setTimestampCCDB(relTF, pc); + if (mSendCCDBOutputOrbitReset[currentOutLane] && mSendCCDBOutputGRPECS[currentOutLane]) { + mSendCCDBOutputOrbitReset[currentOutLane] = false; + mSendCCDBOutputGRPECS[currentOutLane] = false; + pc.outputs().snapshot(Output{gDataOriginTPC, getDataDescriptionCMVOrbitReset(), header::DataHeader::SubSpecificationType{currentOutLane}}, dataformats::Pair{o2::base::GRPGeomHelper::instance().getOrbitResetTimeMS(), o2::base::GRPGeomHelper::instance().getNHBFPerTF()}); } - // capture orbit/BC info into the interval once per relTF. - // all CRUs within a TF carry identical timing, so the first one is sufficient. - if (!mOrbitInfoForwarded[currentBuffer][relTF]) { - for (auto& ref : InputRecordWalker(pc.inputs(), mOrbitFilter)) { - auto const* hdr = o2::framework::DataRefUtils::getHeader(ref); - const unsigned int cru = hdr->subSpecification >> 7; - if (std::binary_search(mCRUs.begin(), mCRUs.end(), cru)) { - const auto orbitBC = pc.inputs().get(ref); - if (mCurrentTF.firstOrbit == 0 && mCurrentTF.firstBC == 0) { - mCurrentTF.firstOrbit = static_cast(orbitBC >> 32); - mCurrentTF.firstBC = static_cast(orbitBC & 0xFFFFu); - } - mOrbitInfoForwarded[currentBuffer][relTF] = true; - break; // one per relTF is enough - } - } - } + forwardOrbitInfo(pc, currentBuffer, relTF, currentOutLane); for (auto& ref : InputRecordWalker(pc.inputs(), mFilter)) { auto const* tpcCRUHeader = o2::framework::DataRefUtils::getHeader(ref); @@ -222,79 +192,43 @@ class TPCDistributeCMVSpec : public o2::framework::Task // check if cru is specified in input cru list if (!(std::binary_search(mCRUs.begin(), mCRUs.end(), cru))) { - LOGP(info, "Received data from CRU: {} which was not specified as input. Skipping", cru); + LOGP(debug, "Received data from CRU: {} which was not specified as input. Skipping", cru); continue; } if (mProcessedCRUs[currentBuffer][relTF][cru]) { continue; - } else { - // count total number of processed CRUs for given TF - ++mProcessedCRU[currentBuffer][relTF]; - - // to keep track of processed CRUs - mProcessedCRUs[currentBuffer][relTF][cru] = true; } + // count total number of processed CRUs for given TF + ++mProcessedCRU[currentBuffer][relTF]; + // to keep track of processed CRUs + mProcessedCRUs[currentBuffer][relTF][cru] = true; - // accumulate raw 16-bit CMVs into the flat array for the current TF - auto cmvVec = pc.inputs().get>(ref); - const uint32_t nTimeBins = std::min(static_cast(cmvVec.size()), cmv::NTimeBinsPerTF); - for (uint32_t tb = 0; tb < nTimeBins; ++tb) { - mCurrentTF.mDataPerTF[cru * cmv::NTimeBinsPerTF + tb] = cmvVec[tb]; - } + sendOutput(pc, currentOutLane, cru, pc.inputs().get>(ref)); } - LOGP(info, "Number of received CRUs for current TF: {} Needed a total number of processed CRUs of: {} Current TF: {}", mProcessedCRU[currentBuffer][relTF], mCRUs.size(), tf); + LOGP(detail, "Number of received CRUs for current TF: {} Needed a total number of processed CRUs of: {} Current TF: {}", mProcessedCRU[currentBuffer][relTF], mCRUs.size(), tf); // check for missing data if specified if (mNTFsDataDrop > 0) { - checkIntervalsForMissingData(pc, currentBuffer, relTF, tf); + checkIntervalsForMissingData(pc, currentBuffer, relTF, currentOutLane, tf); } if (mProcessedCRU[currentBuffer][relTF] == mCRUs.size()) { ++mProcessedTFs[currentBuffer]; - - // Pre-processing: quantisation / rounding / zeroing (applied before compression) - mCurrentTF.roundToIntegers(mRoundIntegersThreshold); - if (mZeroThreshold > 0.f) { - mCurrentTF.zeroSmallValues(mZeroThreshold); - } - if (mDynamicPrecisionSigma > 0.f) { - mCurrentTF.trimGaussianPrecision(mDynamicPrecisionMean, mDynamicPrecisionSigma); - } - - // Compress; the raw CMVPerTF branch is used when all flags are zero - const uint8_t flags = buildCompressionFlags(); - if (flags != CMVEncoding::kNone) { - mCurrentCompressedTF = mCurrentTF.compress(flags); - } - - mIntervalTree->Fill(); - ++mIntervalTFCount; - mCurrentTF = CMVPerTF{}; } if (mProcessedTFs[currentBuffer] == mTimeFrames) { - sendOutput(pc.outputs(), tf); - finishInterval(pc, currentBuffer, tf); + finishInterval(pc, currentOutLane, currentBuffer, tf); } } - void endOfStream(o2::framework::EndOfStreamContext& ec) final - { - LOGP(info, "End of stream, flushing CMV interval ({} TFs)", mIntervalTFCount); - // correct mTFEnd for the partial last interval so the CCDB validity end timestamp reflects the actual last TF, not the expected interval end - mTFEnd[mBuffer] = mLastSeenTF; - sendOutput(ec.outputs(), mLastSeenTF); - ec.services().get().readyToQuit(QuitRequest::Me); - } - - static constexpr header::DataDescription getDataDescriptionCCDBCMV() { return header::DataDescription{"TPC_CMV"}; } + void endOfStream(o2::framework::EndOfStreamContext& ec) final { ec.services().get().readyToQuit(QuitRequest::Me); } /// Return data description for aggregated CMVs for a given lane static header::DataDescription getDataDescriptionCMV(const unsigned int lane) { - const std::string name = fmt::format("CMVAGG{}", lane).data(); + const std::string name = fmt::format("CMVAGG{}", lane); header::DataDescription description; description.runtimeInit(name.substr(0, 16).c_str()); return description; @@ -315,276 +249,211 @@ class TPCDistributeCMVSpec : public o2::framework::Task private: std::vector mCRUs{}; ///< CRUs to process in this instance const unsigned int mTimeFrames{}; ///< number of TFs per aggregation interval - const int mNTFsBuffer{1}; ///< number of TFs for which the CMVs will be buffered - std::array mProcessedTFs{{0, 0}}; ///< number of processed time frames to keep track of when the writing to CCDB will be done - std::array, 2> mProcessedCRU{}; ///< counter of received data from CRUs per TF to merge incoming data from FLPs. Buffer used in case one FLP delivers the TF after the last TF for the current aggregation interval faster then the other FLPs the last TF. - std::array>, 2> mProcessedCRUs{}; ///< to keep track of the already processed CRUs ([buffer][relTF][CRU]) - std::array mTFStart{}; ///< storing of first TF for buffer interval - std::array mTFEnd{}; ///< storing of last TF for buffer interval - std::shared_ptr mCCDBRequest; ///< info for CCDB request - std::vector mSendCCDBOutputOrbitReset{}; ///< flag for received orbit reset time from CCDB - std::vector mSendCCDBOutputGRPECS{}; ///< flag for received orbit GRPECS from CCDB - bool mBuffer{false}; ///< buffer index - bool mSendCCDB{false}; ///< send output to CCDB populator - bool mUsePreciseTimestamp{false}; ///< use precise timestamp from orbit-reset info - bool mDumpCMVs{false}; ///< write a local ROOT debug file - bool mUseCompressionVarint{false}; ///< use delta+zigzag+varint compression (all values, no sparse skip); combined with mUseSparse → SparseV2 mode 1 - bool mUseSparse{false}; ///< sparse encoding; alone = raw uint16 values; combined with varint/Huffman flag → SparseV2 - bool mUseCompressionHuffman{false}; ///< Huffman encoding; combined with mUseSparse → SparseV2 mode 2 - uint16_t mRoundIntegersThreshold{0}; ///< round values to nearest integer ADC for |v| <= N ADC; 0 = disabled - float mZeroThreshold{0.f}; ///< zero out CMV values whose float magnitude is below this threshold; 0 = disabled - float mDynamicPrecisionMean{1.f}; ///< Gaussian centre in |CMV| ADC where the strongest fractional-bit trimming is applied - float mDynamicPrecisionSigma{0.f}; ///< Gaussian width in ADC for the fractional-bit trimming; 0 disables - long mTimestampStart{0}; ///< CCDB validity start timestamp - dataformats::Pair mTFInfo{}; ///< orbit-reset time and NHBFPerTF for precise timestamp - std::unique_ptr mIntervalTree{}; ///< TTree accumulating one entry per completed TF in the current interval - CMVPerTF mCurrentTF{}; ///< staging object filled per CRU before compression - CMVPerTFCompressed mCurrentCompressedTF{}; ///< compressed output for the current TF (used when flags != kNone) - long mIntervalFirstTF{0}; ///< absolute TF counter of the first TF in the current aggregation interval - unsigned int mIntervalTFCount{0}; ///< number of TTree entries filled for the current aggregation interval - int mNFactorTFs{0}; ///< Number of TFs to skip for sending oldest TF - int mNTFsDataDrop{0}; ///< delay for the check if TFs are missing in TF units - std::array mStartNTFsDataDrop{0}; ///< first relative TF to check - long mProcessedTotalData{0}; ///< used to check for dropeed TF data - int mCheckEveryNData{1}; ///< factor after which to check for missing data (in case data missing -> send dummy data) - std::vector mFilter{}; ///< filter for looping over input data - std::vector mOrbitFilter{}; ///< filter for CMVORBITINFO from FLP - std::array, 2> mOrbitInfoForwarded{}; ///< tracks whether orbit/BC has been captured per (buffer, relTF) - uint32_t mLastSeenTF{0}; ///< last TF counter seen in run(), used to set lastTF in endOfStream flush - - /// Returns real number of TFs taking buffer size into account + const int mNTFsBuffer{1}; ///< number of TFs for which the CMVs will be buffered (must match TPCFLPCMVSpec) + const unsigned int mOutLanes{}; ///< number of parallel aggregate pipelines this distributor feeds + std::array mProcessedTFs{{0, 0}}; ///< number of processed timeframes per buffer; triggers sendOutput when it reaches mTimeFrames + std::array, 2> mProcessedCRU{}; ///< counter of received CRUs per (buffer, relTF); used to detect when a relTF is complete + std::array>, 2> mProcessedCRUs{}; ///< per-CRU received flag ([buffer][relTF][CRU]); prevents double-counting when a CRU re-sends + std::array mTFStart{}; ///< absolute TF counter of the first TF in each buffer interval + std::array mTFEnd{}; ///< absolute TF counter of the last TF in each buffer interval + std::array mSendOutputStartInfo{true, true}; ///< flag to send CMVFIRSTTF message once at the start of each buffer interval + std::shared_ptr mCCDBRequest; ///< info for CCDB request (orbit-reset and GRPECS, only on lane 0 when sendPreciseTimestamp=true) + std::vector mSendCCDBOutputOrbitReset{}; ///< per-output-lane flag: true when a fresh orbit-reset object has been received from CCDB + std::vector mSendCCDBOutputGRPECS{}; ///< per-output-lane flag: true when a fresh GRPECS object has been received from CCDB + unsigned int mCurrentOutLane{0}; ///< output lane currently being filled + bool mBuffer{false}; ///< double-buffer index (false = buffer 0, true = buffer 1) + int mNFactorTFs{0}; ///< number of TFs to skip when setting oldestForChannel; resets to 0 after first interval + int mNTFsDataDrop{0}; ///< delay (in relTF units) before declaring a relTF's missing CRUs as lost + std::array mStartNTFsDataDrop{0}; ///< first relative TF index to check for missing data in each buffer + long mProcessedTotalData{0}; ///< call counter used to throttle checkIntervalsForMissingData checks + int mCheckEveryNData{1}; ///< check for missing data every N run() calls (0 → default = mTimeFrames/2) + std::vector mFilter{}; ///< filter for looping over CMVGROUP input data from FLPs + std::vector mOrbitFilter{}; ///< filter for CMVORBITINFO input from FLPs + std::vector mDataDescrOut{}; ///< per-output-lane CMV data descriptions (CMVAGG0, CMVAGG1, …) + std::vector mOrbitDescrOut{}; ///< per-output-lane orbit-info data descriptions (CMVORB0, CMVORB1, …) + std::array, 2> mOrbitInfoForwarded{}; ///< tracks whether orbit/BC has been forwarded to the aggregate lane per (buffer, relTF) + + /// Returns the output aggregate lane for a given TF counter (advances when the current buffer interval has ended) + unsigned int getOutLane(const uint32_t tf) const { return (tf > mTFEnd[mBuffer]) ? (mCurrentOutLane + 1) % mOutLanes : mCurrentOutLane; } + /// Returns the total number of real TFs per buffer interval (= mNTFsBuffer * mTimeFrames) unsigned int getNRealTFs() const { return mNTFsBuffer * mTimeFrames; } - /// Build the CMVEncoding bitmask from the current option flags. - uint8_t buildCompressionFlags() const + void sendOutput(o2::framework::ProcessingContext& pc, const unsigned int currentOutLane, const unsigned int cru, o2::pmr::vector cmvs) { - uint8_t flags = CMVEncoding::kNone; - if (mUseSparse) { - flags |= CMVEncoding::kSparse; - } - if (mUseCompressionHuffman) { - flags |= CMVEncoding::kZigzag | CMVEncoding::kHuffman; - } else if (mUseCompressionVarint) { - flags |= CMVEncoding::kZigzag | CMVEncoding::kVarint; + pc.outputs().adoptContainer(Output{gDataOriginTPC, mDataDescrOut[currentOutLane], header::DataHeader::SubSpecificationType{cru}}, std::move(cmvs)); + } + + void sendOrbitInfo(o2::framework::ProcessingContext& pc, const unsigned int outLane, const uint64_t orbitInfo) + { + pc.outputs().snapshot(Output{gDataOriginTPC, mOrbitDescrOut[outLane], header::DataHeader::SubSpecificationType{outLane}}, orbitInfo); + } + + void forwardOrbitInfo(o2::framework::ProcessingContext& pc, const bool currentBuffer, const unsigned int relTF, const unsigned int currentOutLane) + { + if (mOrbitInfoForwarded[currentBuffer][relTF]) { + return; } - // Delta coding is only applied for the dense (non-sparse) path with a value compressor - if (!(flags & CMVEncoding::kSparse) && (flags & (CMVEncoding::kVarint | CMVEncoding::kHuffman))) { - flags |= CMVEncoding::kDelta; + + for (auto& ref : InputRecordWalker(pc.inputs(), mOrbitFilter)) { + auto const* hdr = o2::framework::DataRefUtils::getHeader(ref); + const unsigned int cru = hdr->subSpecification >> 7; + if (!std::binary_search(mCRUs.begin(), mCRUs.end(), cru)) { + continue; + } + + sendOrbitInfo(pc, currentOutLane, pc.inputs().get(ref)); + mOrbitInfoForwarded[currentBuffer][relTF] = true; + break; } - return flags; } - /// Create a fresh in-memory TTree for the next aggregation interval. - /// Uses a single CMVPerTFCompressed branch whenever any compression is active, - /// or a raw CMVPerTF branch when no compression flags are set. - void initIntervalTree() + void forwardEOSData(o2::framework::ProcessingContext& pc) { - mIntervalTree = std::make_unique("ccdb_object", "ccdb_object"); - mIntervalTree->SetAutoSave(0); - mIntervalTree->SetDirectory(nullptr); - if (buildCompressionFlags() != CMVEncoding::kNone) { - mIntervalTree->Branch("CMVPerTFCompressed", &mCurrentCompressedTF); - } else { - mIntervalTree->Branch("CMVPerTF", &mCurrentTF); + const unsigned int currentOutLane = mCurrentOutLane; + + if (mSendOutputStartInfo[mBuffer] && (mTFStart[mBuffer] >= 0)) { + mSendOutputStartInfo[mBuffer] = false; + pc.outputs().snapshot(Output{gDataOriginTPC, getDataDescriptionCMVFirstTF(), header::DataHeader::SubSpecificationType{currentOutLane}}, mTFStart[mBuffer]); + } + + if (mSendCCDBOutputOrbitReset[currentOutLane] && mSendCCDBOutputGRPECS[currentOutLane]) { + mSendCCDBOutputOrbitReset[currentOutLane] = false; + mSendCCDBOutputGRPECS[currentOutLane] = false; + pc.outputs().snapshot(Output{gDataOriginTPC, getDataDescriptionCMVOrbitReset(), header::DataHeader::SubSpecificationType{currentOutLane}}, dataformats::Pair{o2::base::GRPGeomHelper::instance().getOrbitResetTimeMS(), o2::base::GRPGeomHelper::instance().getNHBFPerTF()}); + } + + if (!mOrbitInfoForwarded[mBuffer].empty()) { + for (auto& ref : InputRecordWalker(pc.inputs(), mOrbitFilter)) { + auto const* hdr = o2::framework::DataRefUtils::getHeader(ref); + const unsigned int cru = hdr->subSpecification >> 7; + if (!std::binary_search(mCRUs.begin(), mCRUs.end(), cru)) { + continue; + } + sendOrbitInfo(pc, currentOutLane, pc.inputs().get(ref)); + break; + } + } + + for (auto& ref : InputRecordWalker(pc.inputs(), mFilter)) { + auto const* hdr = o2::framework::DataRefUtils::getHeader(ref); + const unsigned int cru = hdr->subSpecification >> 7; + if (!std::binary_search(mCRUs.begin(), mCRUs.end(), cru)) { + continue; + } + sendOutput(pc, currentOutLane, cru, pc.inputs().get>(ref)); } } void clearBuffer(const bool currentBuffer) { - // resetting received CRUs + // reset per-CRU received flags so the next interval can accept data from all CRUs again for (auto& crusMap : mProcessedCRUs[currentBuffer]) { for (auto& it : crusMap) { it.second = false; } } - mProcessedTFs[currentBuffer] = 0; // reset processed TFs for next aggregation interval + mProcessedTFs[currentBuffer] = 0; std::fill(mProcessedCRU[currentBuffer].begin(), mProcessedCRU[currentBuffer].end(), 0); std::fill(mOrbitInfoForwarded[currentBuffer].begin(), mOrbitInfoForwarded[currentBuffer].end(), false); - // set integration range for next integration interval mTFStart[mBuffer] = mTFEnd[!mBuffer] + 1; mTFEnd[mBuffer] = mTFStart[mBuffer] + getNRealTFs() - 1; - // switch buffer + // switch buffer and advance output lane mBuffer = !mBuffer; + mCurrentOutLane = ++mCurrentOutLane % mOutLanes; } - void checkIntervalsForMissingData(o2::framework::ProcessingContext& pc, const bool currentBuffer, const long relTF, const uint32_t tf) + void checkIntervalsForMissingData(o2::framework::ProcessingContext& pc, const bool currentBuffer, const long relTF, const unsigned int currentOutLane, const uint32_t tf) { if (!(mProcessedTotalData++ % mCheckEveryNData)) { - LOGP(info, "Checking for dropped packages..."); + LOGP(detail, "Checking for dropped packages..."); - // if last buffer has smaller time range check the whole last buffer + // if the last buffer has a smaller time range than expected, flush its remaining uncompleted TFs if ((mTFStart[currentBuffer] > mTFStart[!currentBuffer]) && (relTF > mNTFsDataDrop)) { LOGP(warning, "Checking last buffer from {} to {}", mStartNTFsDataDrop[!currentBuffer], mProcessedCRU[!currentBuffer].size()); - checkMissingData(pc, !currentBuffer, mStartNTFsDataDrop[!currentBuffer], mProcessedCRU[!currentBuffer].size()); - LOGP(info, "All empty TFs for TF {} for current buffer filled with dummy and sent. Clearing buffer", tf); - sendOutput(pc.outputs(), tf); - finishInterval(pc, !currentBuffer, tf); + const unsigned int lastLane = (currentOutLane == 0) ? (mOutLanes - 1) : (currentOutLane - 1); + checkMissingData(pc, !currentBuffer, mStartNTFsDataDrop[!currentBuffer], mProcessedCRU[!currentBuffer].size(), lastLane); + LOGP(detail, "All empty TFs for TF {} for current buffer filled with dummy and sent. Clearing buffer", tf); + finishInterval(pc, lastLane, !currentBuffer, tf); } const int tfEndCheck = std::clamp(static_cast(relTF) - mNTFsDataDrop, 0, static_cast(mProcessedCRU[currentBuffer].size())); - LOGP(info, "Checking current buffer from {} to {}", mStartNTFsDataDrop[currentBuffer], tfEndCheck); - checkMissingData(pc, currentBuffer, mStartNTFsDataDrop[currentBuffer], tfEndCheck); + LOGP(detail, "Checking current buffer from {} to {}", mStartNTFsDataDrop[currentBuffer], tfEndCheck); + checkMissingData(pc, currentBuffer, mStartNTFsDataDrop[currentBuffer], tfEndCheck, currentOutLane); mStartNTFsDataDrop[currentBuffer] = tfEndCheck; } } - void checkMissingData(o2::framework::ProcessingContext& pc, const bool currentBuffer, const int startTF, const int endTF) + void checkMissingData(o2::framework::ProcessingContext& pc, const bool currentBuffer, const int startTF, const int endTF, const unsigned int outLane) { for (int iTF = startTF; iTF < endTF; ++iTF) { if (mProcessedCRU[currentBuffer][iTF] != mCRUs.size()) { - LOGP(warning, "CRUs for rel. TF: {} curr TF {} are missing! Processed {} CRUs out of {}", iTF, mTFStart[currentBuffer] + iTF, mProcessedCRU[currentBuffer][iTF], mCRUs.size()); + LOGP(warning, "CRUs for lane {} rel. TF: {} curr TF {} are missing! Processed {} CRUs out of {}", outLane, iTF, mTFStart[currentBuffer] + iTF, mProcessedCRU[currentBuffer][iTF], mCRUs.size()); ++mProcessedTFs[currentBuffer]; mProcessedCRU[currentBuffer][iTF] = mCRUs.size(); - // find missing CRUs and leave their interval slots empty (zero-filled) + // send empty payloads for missing CRUs so the aggregate lane sees a complete set for (auto& it : mProcessedCRUs[currentBuffer][iTF]) { if (!it.second) { it.second = true; + sendOutput(pc, outLane, it.first, pmr::vector()); } } - // leave orbit/BC as zero placeholder for missing TFs - mOrbitInfoForwarded[currentBuffer][iTF] = true; + // send zero orbit placeholder for missing TF so the aggregate lane can still reconstruct timing + if (!mOrbitInfoForwarded[currentBuffer][iTF]) { + sendOrbitInfo(pc, outLane, 0); + mOrbitInfoForwarded[currentBuffer][iTF] = true; + } } } } - void finishInterval(o2::framework::ProcessingContext& pc, const bool buffer, const uint32_t tf) + void finishInterval(o2::framework::ProcessingContext& pc, const unsigned int currentOutLane, const bool buffer, const uint32_t tf) { if (mNFactorTFs > 0) { mNFactorTFs = 0; - // ToDo: Find better fix - auto& deviceProxy = pc.services().get(); - if (deviceProxy.getNumOutputChannels() > 0) { - auto& state = deviceProxy.getOutputChannelState({0}); - size_t oldest = std::numeric_limits::max() - 1; // just set to really large value + // ToDo: Find better fix. Set oldestForChannel to a very large value so the DPL dispatcher does not block waiting for older TF data that will never arrive + for (unsigned int ilane = 0; ilane < mOutLanes; ++ilane) { + auto& deviceProxy = pc.services().get(); + auto& state = deviceProxy.getOutputChannelState({static_cast(ilane)}); + size_t oldest = std::numeric_limits::max() - 1; state.oldestForChannel = {oldest}; } } - LOGP(info, "All TFs {} for current buffer received. Clearing buffer", tf); + LOGP(detail, "All TFs {} for current buffer received. Clearing buffer", tf); clearBuffer(buffer); mStartNTFsDataDrop[buffer] = 0; - - // reset per-interval state for the next aggregation interval - initIntervalTree(); - mIntervalFirstTF = 0; - mIntervalTFCount = 0; - mCurrentTF = CMVPerTF{}; - mCurrentCompressedTF = CMVPerTFCompressed{}; - mTimestampStart = 0; - LOGP(info, "Everything cleared. Waiting for new data to arrive."); - } - - void setTimestampCCDB(const long relTF, o2::framework::ProcessingContext& pc) - { - if (mUsePreciseTimestamp && !mTFInfo.second) { - return; - } - const auto& tinfo = pc.services().get(); - const auto nOrbitsOffset = (relTF * mNTFsBuffer + (mNTFsBuffer - 1)) * mTFInfo.second; - mTimestampStart = mUsePreciseTimestamp - ? (mTFInfo.first + (tinfo.firstTForbit - nOrbitsOffset) * o2::constants::lhc::LHCOrbitMUS * 0.001) - : tinfo.creation; - LOGP(info, "Setting timestamp reset reference to: {}, at tfCounter: {}, firstTForbit: {}, NHBFPerTF: {}, relTF: {}, nOrbitsOffset: {}", - mTFInfo.first, tinfo.tfCounter, tinfo.firstTForbit, mTFInfo.second, relTF, nOrbitsOffset); - } - - void sendOutput(DataAllocator& output, const uint32_t tf) - { - using timer = std::chrono::high_resolution_clock; - - if (mIntervalTFCount == 0) { - LOGP(warning, "CMV interval is empty at sendOutput, skipping"); - return; - } - - // attach interval metadata to the TTree (stored once per tree) - mIntervalTree->GetUserInfo()->Clear(); - mIntervalTree->GetUserInfo()->Add(new TParameter("firstTF", mIntervalFirstTF)); - mIntervalTree->GetUserInfo()->Add(new TParameter("lastTF", mLastSeenTF)); - - LOGP(info, "CMVPerTF TTree: {} entries, firstTF={}, lastTF={}", mIntervalTFCount, mIntervalFirstTF, mLastSeenTF); - auto start = timer::now(); - - // write local ROOT file for debugging - if (mDumpCMVs) { - const std::string fname = fmt::format("CMV_timestamp{}.root", mTimestampStart); - try { - mCurrentTF.writeToFile(fname, mIntervalTree); - LOGP(info, "CMV debug file written to {}", fname); - } catch (const std::exception& e) { - LOGP(error, "Failed to write CMV debug file: {}", e.what()); - } - } - - if (!mSendCCDB) { - LOGP(warning, "CCDB output disabled, skipping upload!"); - return; - } - - const int nHBFPerTF = o2::base::GRPGeomHelper::instance().getNHBFPerTF(); - // use the actual number of TFs in this interval (mIntervalTFCount) rather than mTimeFrames, so the CCDB validity end is correct for partial last intervals - const long timeStampEnd = mTimestampStart + static_cast(mIntervalTFCount * mNTFsBuffer * nHBFPerTF * o2::constants::lhc::LHCOrbitMUS * 1e-3); - - if (timeStampEnd <= mTimestampStart) { - LOGP(warning, "Invalid CCDB timestamp range start:{} end:{}, skipping upload!", - mTimestampStart, timeStampEnd); - return; - } - - LOGP(info, "CCDB timestamp range start:{} end:{}", mTimestampStart, timeStampEnd); - - o2::ccdb::CcdbObjectInfo ccdbInfoCMV( - "TPC/Calib/CMV", - "TTree", - "CMV.root", - {}, - mTimestampStart, - timeStampEnd); - - auto image = o2::ccdb::CcdbApi::createObjectImage((mIntervalTree.get()), &ccdbInfoCMV); - // trim TMemFile zero-padding: GetSize() is block-rounded, GetEND() is the actual file end - { - TMemFile mf("trim", image->data(), static_cast(image->size()), "READ"); - image->resize(static_cast(mf.GetEND())); - mf.Close(); - } - LOGP(info, "Sending object {} / {} of size {} bytes, valid for {} : {}", - ccdbInfoCMV.getPath(), ccdbInfoCMV.getFileName(), image->size(), - ccdbInfoCMV.getStartValidityTimestamp(), ccdbInfoCMV.getEndValidityTimestamp()); - - output.snapshot(Output{o2::calibration::Utils::gDataOriginCDBPayload, getDataDescriptionCCDBCMV(), 0}, *image); - output.snapshot(Output{o2::calibration::Utils::gDataOriginCDBWrapper, getDataDescriptionCCDBCMV(), 0}, ccdbInfoCMV); - - auto stop = timer::now(); - std::chrono::duration elapsed = stop - start; - LOGP(info, "CMV CCDB serialisation time: {:.3f} s", elapsed.count()); + mSendOutputStartInfo[buffer] = true; } }; -DataProcessorSpec getTPCDistributeCMVSpec(const int ilane, const std::vector& crus, const unsigned int timeframes, const int firstTF, const bool sendCCDB = false, const bool usePreciseTimestamp = false, const int nTFsBuffer = 1) +DataProcessorSpec getTPCDistributeCMVSpec(const int ilane, const std::vector& crus, const unsigned int timeframes, const unsigned int outlanes, const int firstTF, const bool sendPrecisetimeStamp = false, const int nTFsBuffer = 1) { std::vector inputSpecs; inputSpecs.emplace_back(InputSpec{"cmvsgroup", ConcreteDataTypeMatcher{gDataOriginTPC, TPCFLPCMVDevice::getDataDescriptionCMVGroup()}, Lifetime::Sporadic}); inputSpecs.emplace_back(InputSpec{"cmvorbit", ConcreteDataTypeMatcher{gDataOriginTPC, TPCFLPCMVDevice::getDataDescriptionCMVOrbitInfo()}, Lifetime::Sporadic}); std::vector outputSpecs; - if (sendCCDB) { - outputSpecs.emplace_back( - ConcreteDataTypeMatcher{o2::calibration::Utils::gDataOriginCDBPayload, - TPCDistributeCMVSpec::getDataDescriptionCCDBCMV()}, - Lifetime::Sporadic); - outputSpecs.emplace_back( - ConcreteDataTypeMatcher{o2::calibration::Utils::gDataOriginCDBWrapper, - TPCDistributeCMVSpec::getDataDescriptionCCDBCMV()}, - Lifetime::Sporadic); + outputSpecs.reserve(3 * outlanes); + for (unsigned int lane = 0; lane < outlanes; ++lane) { + outputSpecs.emplace_back(ConcreteDataTypeMatcher{gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMV(lane)}, Lifetime::Sporadic); + outputSpecs.emplace_back(ConcreteDataMatcher{gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVOrbitInfo(lane), header::DataHeader::SubSpecificationType{lane}}, Lifetime::Sporadic); + outputSpecs.emplace_back(ConcreteDataMatcher{gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVFirstTF(), header::DataHeader::SubSpecificationType{lane}}, Lifetime::Sporadic); + } + + // Only lane 0 fetches CCDB orbit-reset/GRPECS objects and broadcasts them to all aggregate lanes, the other distribute lanes do not need them, avoiding redundant CCDB requests + bool fetchCCDB = false; + if (sendPrecisetimeStamp && (ilane == 0)) { + fetchCCDB = true; + for (unsigned int lane = 0; lane < outlanes; ++lane) { + outputSpecs.emplace_back(ConcreteDataMatcher{gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVOrbitReset(), header::DataHeader::SubSpecificationType{lane}}, Lifetime::Sporadic); + } } - const bool fetchCCDB = usePreciseTimestamp; auto ccdbRequest = std::make_shared(fetchCCDB, // orbitResetTime fetchCCDB, // GRPECS=true false, // GRPLHCIF @@ -593,25 +462,15 @@ DataProcessorSpec getTPCDistributeCMVSpec(const int ilane, const std::vector(crus, timeframes, nTFsBuffer, firstTF, sendCCDB, usePreciseTimestamp, ccdbRequest)}, - Options{{"drop-data-after-nTFs", VariantType::Int, 0, {"Number of TFs after which to drop the data"}}, - {"check-data-every-n", VariantType::Int, 0, {"Number of run function called after which to check for missing data (-1 for no checking, 0 for default checking)"}}, - {"nFactorTFs", VariantType::Int, 1000, {"Number of TFs to skip for sending oldest TF"}}, - {"dump-cmvs", VariantType::Bool, false, {"Dump CMVs to a local ROOT file for debugging"}}, - {"use-sparse", VariantType::Bool, false, {"Sparse encoding (skip zero time bins). Alone: raw uint16 values. With --use-compression-varint: varint exact values. With --use-compression-huffman: Huffman exact values"}}, - {"use-compression-varint", VariantType::Bool, false, {"Delta+zigzag+varint compression (all values). Combined with --use-sparse: sparse positions + varint encoded exact CMV values"}}, - {"use-compression-huffman", VariantType::Bool, false, {"Huffman encoding. Combined with --use-sparse: sparse positions + Huffman-encoded exact CMV values"}}, - {"cmv-zero-threshold", VariantType::Float, 0.f, {"Zero out CMV values whose float magnitude is below this threshold after optional integer rounding and before compression; 0 disables"}}, - {"cmv-round-integers-threshold", VariantType::Int, 0, {"Round values to nearest integer ADC for |v| <= N ADC before compression; 0 disables"}}, - {"cmv-dynamic-precision-mean", VariantType::Float, 1.f, {"Gaussian centre in |CMV| ADC where the strongest fractional bit trimming is applied"}}, - {"cmv-dynamic-precision-sigma", VariantType::Float, 0.f, {"Gaussian width in ADC for smooth CMV fractional bit trimming; 0 disables"}}}}; // end DataProcessorSpec - + AlgorithmSpec{adaptFromTask(crus, timeframes, nTFsBuffer, outlanes, firstTF, ccdbRequest)}, + Options{{"drop-data-after-nTFs", VariantType::Int, 0, {"Number of TFs after which to drop the data."}}, + {"check-data-every-n", VariantType::Int, 0, {"Number of run function called after which to check for missing data (-1 for no checking, 0 for default checking)."}}, + {"nFactorTFs", VariantType::Int, 1000, {"Number of TFs to skip for sending oldest TF."}}}}; spec.rank = ilane; return spec; } diff --git a/Detectors/TPC/workflow/src/tpc-aggregate-cmv.cxx b/Detectors/TPC/workflow/src/tpc-aggregate-cmv.cxx new file mode 100644 index 0000000000000..32d2317c3b9b0 --- /dev/null +++ b/Detectors/TPC/workflow/src/tpc-aggregate-cmv.cxx @@ -0,0 +1,86 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#include +#include +#include "Algorithm/RangeTokenizer.h" +#include "Framework/WorkflowSpec.h" +#include "Framework/ConfigParamSpec.h" +#include "CommonUtils/ConfigurableParam.h" +#include "TPCWorkflow/TPCAggregateCMVSpec.h" +#include "Framework/CompletionPolicyHelpers.h" + +using namespace o2::framework; + +// customize the completion policy +void customize(std::vector& policies) +{ + using o2::framework::CompletionPolicy; + policies.push_back(CompletionPolicyHelpers::defineByName("tpc-aggregate-*.*", CompletionPolicy::CompletionOp::Consume)); +} + +// we need to add workflow options before including Framework/runDataProcessing +void customize(std::vector& workflowOptions) +{ + const std::string cruDefault = "0-" + std::to_string(o2::tpc::CRU::MaxCRU - 1); + + std::vector options{ + {"configFile", VariantType::String, "", {"Configuration file for configurable parameters"}}, + {"timeframes", VariantType::Int, 2000, {"Number of TFs aggregated per calibration interval"}}, + {"crus", VariantType::String, cruDefault.c_str(), {"List of CRUs, comma-separated ranges, e.g. 0-3,7,9-15"}}, + {"input-lanes", VariantType::Int, 1, {"Number of aggregate pipelines set by --output-lanes in TPCDistributeCMVSpec"}}, + {"use-precise-timestamp", VariantType::Bool, false, {"Use precise timestamp metadata from distribute when writing to CCDB"}}, + {"enable-CCDB-output", VariantType::Bool, false, {"Send output to the CCDB populator"}}, + {"n-TFs-buffer", VariantType::Int, 1, {"Buffer size that was set in TPCFLPCMVSpec"}}, + {"configKeyValues", VariantType::String, "", {"Semicolon-separated key=value strings"}}}; + + std::swap(workflowOptions, options); +} + +#include "Framework/runDataProcessing.h" + +WorkflowSpec defineDataProcessing(ConfigContext const& config) +{ + using namespace o2::tpc; + + // set up configuration + o2::conf::ConfigurableParam::updateFromFile(config.options().get("configFile")); + o2::conf::ConfigurableParam::updateFromString(config.options().get("configKeyValues")); + o2::conf::ConfigurableParam::writeINI("o2tpcaggregatecmv_configuration.ini"); + + const auto tpcCRUs = o2::RangeTokenizer::tokenize(config.options().get("crus")); + auto timeframes = static_cast(config.options().get("timeframes")); + int aggregateLanes = config.options().get("input-lanes"); + if (aggregateLanes <= 0) { + aggregateLanes = 1; + } + const bool usePreciseTimestamp = config.options().get("use-precise-timestamp"); + const bool sendCCDB = config.options().get("enable-CCDB-output"); + + int nTFsBuffer = config.options().get("n-TFs-buffer"); + if (nTFsBuffer <= 0) { + nTFsBuffer = 1; + } + + // convert total TFs per interval to number of buffered TFs + assert(timeframes >= static_cast(nTFsBuffer)); + timeframes /= static_cast(nTFsBuffer); + + const std::vector rangeCRUs(tpcCRUs.begin(), tpcCRUs.end()); + + WorkflowSpec workflow; + workflow.reserve(static_cast(aggregateLanes)); + LOGP(info, "Starting CMV aggregate with {} lanes, {} timeframes, {} n-TFs-buffer", aggregateLanes, timeframes, nTFsBuffer); + for (int ilane = 0; ilane < aggregateLanes; ++ilane) { + workflow.emplace_back(getTPCAggregateCMVSpec(ilane, rangeCRUs, timeframes, sendCCDB, usePreciseTimestamp, nTFsBuffer)); + } + return workflow; +} diff --git a/Detectors/TPC/workflow/src/tpc-distribute-cmv.cxx b/Detectors/TPC/workflow/src/tpc-distribute-cmv.cxx index b6aaaa0a109ad..0fe780ebb16b3 100644 --- a/Detectors/TPC/workflow/src/tpc-distribute-cmv.cxx +++ b/Detectors/TPC/workflow/src/tpc-distribute-cmv.cxx @@ -38,9 +38,9 @@ void customize(std::vector& workflowOptions) {"firstTF", VariantType::Int, -1, {"First time frame index. (if set to -1 the first TF will be automatically detected. Values < -1 are setting an offset for skipping the first TFs)"}}, {"configKeyValues", VariantType::String, "", {"Semicolon separated key=value strings"}}, {"lanes", VariantType::Int, 1, {"Number of lanes of this device (CRUs are split per lane)"}}, - {"use-precise-timestamp", VariantType::Bool, false, {"Use precise timestamp which can be used for writing to CCDB"}}, - {"enable-CCDB-output", VariantType::Bool, false, {"Send output to the CCDB populator"}}, - {"n-TFs-buffer", VariantType::Int, 1, {"Buffer which was defined in the TPCFLPCMVSpec."}}}; + {"send-precise-timestamp", VariantType::Bool, false, {"Send precise timestamp information to the CMV aggregate workflow"}}, + {"n-TFs-buffer", VariantType::Int, 1, {"Buffer which was defined in the TPCFLPCMVSpec."}}, + {"output-lanes", VariantType::Int, 1, {"Number of parallel pipelines which will be used in the CMV aggregate device."}}}; std::swap(workflowOptions, options); } @@ -57,17 +57,18 @@ WorkflowSpec defineDataProcessing(ConfigContext const& config) const auto tpcCRUs = o2::RangeTokenizer::tokenize(config.options().get("crus")); const auto nCRUs = tpcCRUs.size(); auto timeframes = static_cast(config.options().get("timeframes")); + const auto outlanes = static_cast(config.options().get("output-lanes")); const auto nLanes = static_cast(config.options().get("lanes")); const auto firstTF = static_cast(config.options().get("firstTF")); - const bool usePreciseTimestamp = config.options().get("use-precise-timestamp"); - const bool sendCCDB = config.options().get("enable-CCDB-output"); + const bool sendPrecisetimeStamp = config.options().get("send-precise-timestamp"); int nTFsBuffer = config.options().get("n-TFs-buffer"); if (nTFsBuffer <= 0) { nTFsBuffer = 1; } - assert(timeframes >= nTFsBuffer); - timeframes /= nTFsBuffer; - LOGP(info, "Using {} timeframes as each TF contains {} CMVs", timeframes, nTFsBuffer); + assert(timeframes >= static_cast(nTFsBuffer)); + timeframes /= static_cast(nTFsBuffer); + LOGP(info, "Using {} buffered CMV batches per interval with n-TFs-buffer={}", timeframes, nTFsBuffer); + const auto crusPerLane = nCRUs / nLanes + ((nCRUs % nLanes) != 0); WorkflowSpec workflow; for (int ilane = 0; ilane < nLanes; ++ilane) { @@ -77,8 +78,8 @@ WorkflowSpec defineDataProcessing(ConfigContext const& config) } const auto last = std::min(tpcCRUs.end(), first + crusPerLane); const std::vector rangeCRUs(first, last); - workflow.emplace_back(getTPCDistributeCMVSpec(ilane, rangeCRUs, timeframes, firstTF, sendCCDB, usePreciseTimestamp, nTFsBuffer)); + workflow.emplace_back(getTPCDistributeCMVSpec(ilane, rangeCRUs, timeframes, outlanes, firstTF, sendPrecisetimeStamp, nTFsBuffer)); } return workflow; -} \ No newline at end of file +}