Skip to content

Commit 2339be6

Browse files
committed
Buffering of CM on FLPs
1 parent e669f51 commit 2339be6

File tree

3 files changed

+213
-4
lines changed

3 files changed

+213
-4
lines changed

Detectors/TPC/workflow/CMakeLists.txt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -294,10 +294,10 @@ o2_add_executable(cmv-to-vector
294294
SOURCES src/tpc-cmv-to-vector.cxx
295295
PUBLIC_LINK_LIBRARIES O2::TPCWorkflow)
296296

297-
# o2_add_executable(cmv-flp
298-
# COMPONENT_NAME tpc
299-
# SOURCES src/tpc-flp-cmv.cxx
300-
# PUBLIC_LINK_LIBRARIES O2::TPCWorkflow)
297+
o2_add_executable(cmv-flp
298+
COMPONENT_NAME tpc
299+
SOURCES src/tpc-flp-cmv.cxx
300+
PUBLIC_LINK_LIBRARIES O2::TPCWorkflow)
301301

302302
# o2_add_executable(cmv-distribute
303303
# COMPONENT_NAME tpc
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
12+
/// @file TPCFLPIDCSpec.h
13+
/// @author Tuba Gündem, tuba.gundem@cern.ch
14+
/// @brief TPC device for processing CMVs on FLPs
15+
16+
#ifndef O2_TPCFLPIDCSPEC_H
17+
#define O2_TPCFLPIDCSPEC_H
18+
19+
#include <vector>
20+
#include <fmt/format.h>
21+
#include "Framework/Task.h"
22+
#include "Framework/ControlService.h"
23+
#include "Framework/Logger.h"
24+
#include "Framework/DataProcessorSpec.h"
25+
#include "Framework/InputRecordWalker.h"
26+
#include "Framework/ConfigParamRegistry.h"
27+
#include "Headers/DataHeader.h"
28+
#include "TPCWorkflow/ProcessingHelpers.h"
29+
#include "TPCBase/CRU.h"
30+
#include "TFile.h"
31+
32+
using namespace o2::framework;
33+
using o2::header::gDataOriginTPC;
34+
using namespace o2::tpc;
35+
36+
namespace o2::tpc
37+
{
38+
39+
class TPCFLPCMVDevice : public o2::framework::Task
40+
{
41+
public:
42+
TPCFLPCMVDevice(const int lane, const std::vector<uint32_t>& crus, const int nTFsBuffer)
43+
: mLane{lane}, mCRUs{crus}, mNTFsBuffer{nTFsBuffer} {}
44+
45+
void init(o2::framework::InitContext& ic) final
46+
{
47+
mDumpCMVs = ic.options().get<bool>("dump-cmvs-flp");
48+
}
49+
50+
void run(o2::framework::ProcessingContext& pc) final
51+
{
52+
// LOGP(info, "Processing CMVs for TF {} for CRUs {} to {}", processing_helpers::getCurrentTF(pc), mCRUs.front(), mCRUs.back());
53+
54+
++mCountTFsForBuffer;
55+
56+
for (auto& ref : InputRecordWalker(pc.inputs(), mFilter)) {
57+
auto const* tpcCRUHeader = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
58+
const int cru = tpcCRUHeader->subSpecification >> 7;
59+
auto vecCMVs = pc.inputs().get<o2::pmr::vector<float>>(ref);
60+
mCMVs[cru].insert(mCMVs[cru].end(), vecCMVs.begin(), vecCMVs.end());
61+
}
62+
63+
if (mCountTFsForBuffer >= mNTFsBuffer) {
64+
mCountTFsForBuffer = 0;
65+
for (const auto cru : mCRUs) {
66+
// LOGP(info, "Sending CMVs of size {} for TF {}", mCMVs[cru].size(), processing_helpers::getCurrentTF(pc));
67+
sendOutput(pc.outputs(), cru);
68+
}
69+
}
70+
71+
if (mDumpCMVs) {
72+
TFile fOut(fmt::format("CMVs_{}_tf_{}.root", mLane, processing_helpers::getCurrentTF(pc)).data(), "RECREATE");
73+
for (auto& ref : InputRecordWalker(pc.inputs(), mFilter)) {
74+
auto const* tpcCRUHeader = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
75+
const int cru = tpcCRUHeader->subSpecification >> 7;
76+
auto vec = pc.inputs().get<std::vector<float>>(ref);
77+
fOut.WriteObject(&vec, fmt::format("CRU_{}", cru).data());
78+
}
79+
}
80+
}
81+
82+
void endOfStream(o2::framework::EndOfStreamContext& ec) final
83+
{
84+
if (mCountTFsForBuffer > 0) {
85+
LOGP(info, "Flushing remaining {} buffered TFs at end of stream", mCountTFsForBuffer);
86+
for (const auto cru : mCRUs) {
87+
sendOutput(ec.outputs(), cru);
88+
}
89+
}
90+
ec.services().get<ControlService>().readyToQuit(QuitRequest::Me);
91+
}
92+
93+
static constexpr header::DataDescription getDataDescriptionCMVGroup(const Side side) { return (side == Side::A) ? getDataDescriptionCMVGroupA() : getDataDescriptionCMVGroupC(); }
94+
static constexpr header::DataDescription getDataDescriptionCMVGroupA() { return header::DataDescription{"CMVGROUPA"}; }
95+
static constexpr header::DataDescription getDataDescriptionCMVGroupC() { return header::DataDescription{"CMVGROUPC"}; }
96+
97+
private:
98+
const int mLane{}; ///< lane number of processor
99+
const std::vector<uint32_t> mCRUs{}; ///< CRUs to process in this instance
100+
int mNTFsBuffer{1}; ///< number of TFs to buffer before sending
101+
bool mDumpCMVs{}; ///< dump CMVs to file for debugging
102+
int mCountTFsForBuffer{0}; ///< counts TFs to track when to send output
103+
std::unordered_map<unsigned int, o2::pmr::vector<float>> mCMVs{}; ///< buffered CMV vectors per CRU
104+
const std::vector<InputSpec> mFilter = {{"cmvs", ConcreteDataTypeMatcher{gDataOriginTPC, "CMVVECTOR"}, Lifetime::Timeframe}};
105+
106+
void sendOutput(DataAllocator& output, const uint32_t cru)
107+
{
108+
const header::DataHeader::SubSpecificationType subSpec{cru << 7};
109+
output.adoptContainer(Output{gDataOriginTPC, getDataDescriptionCMVGroup(CRU(cru).side()), subSpec}, std::move(mCMVs[cru]));
110+
}
111+
};
112+
113+
DataProcessorSpec getTPCFLPCMVSpec(const int ilane, const std::vector<uint32_t>& crus, const int nTFsBuffer = 1)
114+
{
115+
std::vector<OutputSpec> outputSpecs;
116+
std::vector<InputSpec> inputSpecs;
117+
outputSpecs.reserve(crus.size());
118+
inputSpecs.reserve(crus.size());
119+
120+
for (const auto& cru : crus) {
121+
const header::DataHeader::SubSpecificationType subSpec{cru << 7};
122+
inputSpecs.emplace_back(InputSpec{"cmvs", gDataOriginTPC, "CMVVECTOR", subSpec, Lifetime::Timeframe});
123+
const Side side = CRU(cru).side();
124+
outputSpecs.emplace_back(ConcreteDataMatcher{gDataOriginTPC, TPCFLPCMVDevice::getDataDescriptionCMVGroup(side), subSpec}, Lifetime::Sporadic);
125+
}
126+
127+
const auto id = fmt::format("tpc-flp-cmv-{:02}", ilane);
128+
return DataProcessorSpec{
129+
id.data(),
130+
inputSpecs,
131+
outputSpecs,
132+
AlgorithmSpec{adaptFromTask<TPCFLPCMVDevice>(ilane, crus, nTFsBuffer)},
133+
Options{{"dump-cmvs-flp", VariantType::Bool, false, {"Dump CMVs to file"}}}};
134+
}
135+
136+
} // namespace o2::tpc
137+
#endif
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
12+
#include <vector>
13+
#include <string>
14+
#include <thread>
15+
#include "CommonUtils/ConfigurableParam.h"
16+
#include "Algorithm/RangeTokenizer.h"
17+
#include "Framework/WorkflowSpec.h"
18+
#include "Framework/ConfigParamSpec.h"
19+
#include "TPCWorkflow/TPCFLPCMVSpec.h"
20+
#include "TPCBase/CRU.h"
21+
22+
using namespace o2::framework;
23+
24+
void customize(std::vector<ConfigParamSpec>& workflowOptions)
25+
{
26+
const std::string cruDefault = "0-" + std::to_string(o2::tpc::CRU::MaxCRU - 1);
27+
const int defaultlanes = std::max(1u, std::thread::hardware_concurrency() / 2);
28+
29+
std::vector<ConfigParamSpec> options{
30+
{"configFile", VariantType::String, "", {"configuration file for configurable parameters"}},
31+
{"lanes", VariantType::Int, defaultlanes, {"Number of parallel processing lanes (crus are split per device)."}},
32+
{"time-lanes", VariantType::Int, 1, {"Number of parallel processing lanes (timeframes are split per device)."}},
33+
{"crus", VariantType::String, cruDefault.c_str(), {"List of CRUs, comma separated ranges, e.g. 0-3,7,9-15"}},
34+
{"n-TFs-buffer", VariantType::Int, 1, {"Buffer n-TFs before sending output."}},
35+
{"configKeyValues", VariantType::String, "", {"Semicolon separated key=value strings"}}};
36+
37+
std::swap(workflowOptions, options);
38+
}
39+
40+
#include "Framework/runDataProcessing.h"
41+
42+
WorkflowSpec defineDataProcessing(ConfigContext const& config)
43+
{
44+
using namespace o2::tpc;
45+
o2::conf::ConfigurableParam::updateFromString(config.options().get<std::string>("configKeyValues"));
46+
const auto tpcCRUs = o2::RangeTokenizer::tokenize<int>(config.options().get<std::string>("crus"));
47+
const auto nCRUs = tpcCRUs.size();
48+
const auto nLanes = std::min(static_cast<unsigned long>(config.options().get<int>("lanes")), nCRUs);
49+
const auto time_lanes = static_cast<unsigned int>(config.options().get<int>("time-lanes"));
50+
const auto crusPerLane = nCRUs / nLanes + ((nCRUs % nLanes) != 0);
51+
const int nTFsBuffer = config.options().get<int>("n-TFs-buffer");
52+
53+
o2::conf::ConfigurableParam::updateFromFile(config.options().get<std::string>("configFile"));
54+
o2::conf::ConfigurableParam::writeINI("o2tpcflp_configuration.ini");
55+
56+
WorkflowSpec workflow;
57+
if (nLanes <= 0) {
58+
return workflow;
59+
}
60+
61+
for (int ilane = 0; ilane < nLanes; ++ilane) {
62+
const auto first = tpcCRUs.begin() + ilane * crusPerLane;
63+
if (first >= tpcCRUs.end()) {
64+
break;
65+
}
66+
const auto last = std::min(tpcCRUs.end(), first + crusPerLane);
67+
const std::vector<uint32_t> rangeCRUs(first, last);
68+
workflow.emplace_back(timePipeline(getTPCFLPCMVSpec(ilane, rangeCRUs, nTFsBuffer), time_lanes));
69+
}
70+
71+
return workflow;
72+
}

0 commit comments

Comments
 (0)