1414#include " Framework/DataSpecUtils.h"
1515#include " Framework/EndOfStreamContext.h"
1616#include " Framework/ParallelContext.h"
17- #include " Framework/runDataProcessing.h"
1817#include " Framework/ControlService.h"
18+ #include " Framework/RawDeviceService.h"
1919#include " Framework/ParallelContext.h"
20+ #include " Framework/CompletionPolicy.h"
21+ #include " Framework/CompletionPolicyHelpers.h"
22+ #include " Framework/DataRefUtils.h"
23+ #include < FairMQDevice.h>
2024#include < iostream>
2125#include < algorithm>
2226#include < memory>
2327#include < unordered_map>
2428
29+ // customize clusterers and cluster decoders to process immediately what comes in
30+ void customize (std::vector<o2::framework::CompletionPolicy>& policies)
31+ {
32+ // we customize the pipeline processors to consume data as it comes
33+ using CompletionPolicy = o2::framework::CompletionPolicy;
34+ using CompletionPolicyHelpers = o2::framework::CompletionPolicyHelpers;
35+ policies.push_back (CompletionPolicyHelpers::defineByName (" consumer" , CompletionPolicy::CompletionOp::Consume));
36+ }
37+ #include " Framework/runDataProcessing.h"
38+
2539#define ASSERT_ERROR (condition ) \
2640 if ((condition) == false ) { \
2741 LOG (ERROR) << R"( Test condition ")" #condition R"( " failed)" ; \
@@ -84,6 +98,19 @@ std::vector<DataProcessorSpec> defineDataProcessing(ConfigContext const&)
8498 // nParallelChannels and is distributed among the pipelines
8599 std::vector<o2::header::DataHeader::SubSpecificationType> subspecs (nParallelChannels);
86100 std::generate (subspecs.begin (), subspecs.end (), [counter = std::make_shared<int >(0 )]() { return 0x1 << (*counter)++; });
101+ // correspondence between the subspec and the instance which serves this particular subspec
102+ // this is checked in the final consumer
103+ auto checkMap = std::make_shared<std::unordered_map<o2::header::DataHeader::SubSpecificationType, int >>();
104+ {
105+ size_t pipeline = 0 ;
106+ for (auto const & subspec : subspecs) {
107+ (*checkMap)[subspec] = pipeline;
108+ pipeline++;
109+ if (pipeline >= nPipelines) {
110+ pipeline = 0 ;
111+ }
112+ }
113+ }
87114 workflowSpecs = parallelPipeline (
88115 workflowSpecs, nPipelines,
89116 [&subspecs]() { return subspecs.size (); },
@@ -98,14 +125,11 @@ std::vector<DataProcessorSpec> defineDataProcessing(ConfigContext const&)
98125 return outputs;
99126 };
100127
101- // we keep the correspondence between the subspec and the instance which serves this particular subspec
102- // this is checked in the final consumer
103- auto checkMap = std::make_shared<std::unordered_map<o2::header::DataHeader::SubSpecificationType, int >>();
104128 workflowSpecs.emplace_back (DataProcessorSpec{
105129 " trigger" ,
106130 Inputs{},
107131 producerOutputs (),
108- AlgorithmSpec{[subspecs, checkMap, counter = std::make_shared<int >(0 )](ProcessingContext& ctx) {
132+ AlgorithmSpec{[subspecs, counter = std::make_shared<int >(0 )](ProcessingContext& ctx) {
109133 if (*counter < nRolls) {
110134 size_t pipeline = 0 ;
111135 size_t channels = subspecs.size ();
@@ -122,7 +146,6 @@ std::vector<DataProcessorSpec> defineDataProcessing(ConfigContext const&)
122146 continue ;
123147 }
124148 ctx.outputs ().make <int >(Output{" TST" , " TRIGGER" , subspecs[index], Lifetime::Timeframe}) = pipeline;
125- (*checkMap)[subspecs[index]] = pipeline;
126149 multiplicities[pipeline++]--;
127150 if (pipeline >= nPipelines) {
128151 pipeline = 0 ;
@@ -138,26 +161,63 @@ std::vector<DataProcessorSpec> defineDataProcessing(ConfigContext const&)
138161 }}});
139162
140163 // the final consumer
164+ // map of bindings is used to check the channel names, note that the object is captured by
165+ // reference in mergeInputs which is a helper executed at construction of DataProcessorSpec,
166+ // while the AlgorithmSpec stores a lambda to be called later on, and the object must be
167+ // passed by copy or move in order to have a valid object upon invocation
168+ std::unordered_map<o2::header::DataHeader::SubSpecificationType, std::string> bindings;
141169 workflowSpecs.emplace_back (DataProcessorSpec{
142170 " consumer" ,
143171 mergeInputs ({{" datain" , " TST" , " DATA" , 0 , Lifetime::Timeframe},
144172 {" metain" , " TST" , " META" , 0 , Lifetime::Timeframe}},
145173 subspecs.size (),
146- [&subspecs](InputSpec& input, size_t index) {
174+ [&subspecs, &bindings](InputSpec& input, size_t index) {
175+ input.binding += std::to_string (index);
147176 DataSpecUtils::updateMatchingSubspec (input, subspecs[index]);
177+ if (input.binding .compare (0 , 6 , " datain" ) == 0 ) {
178+ bindings[subspecs[index]] = input.binding ;
179+ }
148180 }),
149181 Outputs (),
150- AlgorithmSpec{adaptStateless ([checkMap](InputRecord& inputs, CallbackService& callbacks, ControlService&) {
151- callbacks.set (CallbackService::Id::EndOfStream, [](EndOfStreamContext& ctx) {
152- ctx.services ().get <ControlService>().readyToQuit (QuitRequest::All);
182+ AlgorithmSpec{adaptStateful ([checkMap, bindings = std::move (bindings)](CallbackService& callbacks) {
183+ callbacks.set (CallbackService::Id::EndOfStream, [checkMap](EndOfStreamContext& ctx) {
184+ for (auto const & [subspec, pipeline] : *checkMap) {
185+ // we require all checks to be invalidated
186+ ASSERT_ERROR (pipeline == -1 );
187+ }
188+ checkMap->clear ();
189+ });
190+ callbacks.set (CallbackService::Id::Stop, [checkMap]() {
191+ ASSERT_ERROR (checkMap->size () == 0 );
153192 });
154- for (auto const & input : inputs) {
155- LOG (DEBUG) << " consuming : " << *input.spec << " : " << *((int *)input.payload );
156- auto const * dataheader = DataRefUtils::getHeader<o2::header::DataHeader*>(input);
157- if (input.spec ->binding .compare (0 , 6 , " datain" ) == 0 ) {
158- ASSERT_ERROR ((*checkMap)[dataheader->subSpecification ] == inputs.get <int >(input.spec ->binding .c_str ()));
193+ return adaptStateless ([checkMap, bindings = std::move (bindings)](InputRecord& inputs) {
194+ bool haveDataIn = false ;
195+ for (auto const & input : inputs) {
196+ if (!DataRefUtils::isValid (input)) {
197+ continue ;
198+ }
199+ LOG (DEBUG) << " consuming : " << *input.spec << " : " << *((int *)input.payload );
200+ auto const * dataheader = DataRefUtils::getHeader<o2::header::DataHeader*>(input);
201+ if (input.spec ->binding .compare (0 , 6 , " datain" ) == 0 ) {
202+ if (input.spec ->binding != bindings.at (dataheader->subSpecification )) {
203+ LOG (ERROR) << " data with subspec " << dataheader->subSpecification << " at unexpected binding " << input.spec ->binding << " , expected " << bindings.at (dataheader->subSpecification );
204+ }
205+ haveDataIn = true ;
206+ ASSERT_ERROR (checkMap->at (dataheader->subSpecification ) == inputs.get <int >(input.spec ->binding .c_str ()));
207+ // keep a backup before invalidating, the backup is used in the check below, which can throw and therefor
208+ // must be after invalidation
209+ auto pipeline = checkMap->at (dataheader->subSpecification );
210+ // invalidate, we check in the end of stream callback that all are invalidated
211+ (*checkMap)[dataheader->subSpecification ] = -1 ;
212+ // check if we can access channels by binding
213+ if (inputs.isValid (bindings.at (dataheader->subSpecification ))) {
214+ ASSERT_ERROR (inputs.get <int >(bindings.at (dataheader->subSpecification )) == pipeline);
215+ }
216+ }
159217 }
160- }
218+ // we require each input cycle to have data on datain channel
219+ ASSERT_ERROR (haveDataIn);
220+ });
161221 })}});
162222
163223 return workflowSpecs;
0 commit comments