Skip to content

Commit f7eb5d5

Browse files
committed
explicitly subscribe dummy sink to TFN&TFF
1 parent c66faec commit f7eb5d5

2 files changed

Lines changed: 18 additions & 8 deletions

File tree

Framework/Core/src/ArrowSupport.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -703,7 +703,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
703703
});
704704
if (tfnsource == workflow.end()) { // add normal reader algorithm only if no on-the-fly generator is injected
705705
reader->algorithm = CommonDataProcessors::wrapWithTimesliceConsumption(PluginManager::loadAlgorithmFromPlugin("O2FrameworkAnalysisSupport", "ROOTFileReader", ctx));
706-
} // otherwise the algorithm was set in injectServiceDevices
706+
} // otherwise the algorithm was already set in injectServiceDevices
707707
}
708708
}
709709

Framework/Core/src/WorkflowHelpers.cxx

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -456,13 +456,14 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
456456
}
457457
}
458458

459+
auto tfnsource = std::ranges::find_if(workflow, [](DataProcessorSpec const& spec) {
460+
return std::ranges::any_of(spec.outputs, [](OutputSpec const& output) {
461+
return DataSpecUtils::match(output, "TFN", "TFNumber", 0);
462+
});
463+
});
464+
459465
// add the reader
460466
if (aodReader.outputs.empty() == false) {
461-
auto tfnsource = std::ranges::find_if(workflow, [](DataProcessorSpec const& spec) {
462-
return std::ranges::any_of(spec.outputs, [](OutputSpec const& output) {
463-
return DataSpecUtils::match(output, "TFN", "TFNumber", 0);
464-
});
465-
});
466467
if (tfnsource == workflow.end()) {
467468
// add normal reader
468469
aodReader.outputs.emplace_back(OutputSpec{"TFN", "TFNumber"});
@@ -599,6 +600,12 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
599600

600601
std::vector<InputSpec> unmatched;
601602
auto forwardingDestination = ctx.options().get<std::string>("forwarding-destination");
603+
// update tfnsource iterator (could be aod-reader)
604+
tfnsource = std::ranges::find_if(workflow, [](DataProcessorSpec const& spec) {
605+
return std::ranges::any_of(spec.outputs, [](OutputSpec const& output) {
606+
return DataSpecUtils::match(output, "TFN", "TFNumber", 0);
607+
});
608+
});
602609
if (redirectedOutputsInputs.size() > 0 && forwardingDestination == "file") {
603610
auto fileSink = CommonDataProcessors::getGlobalFileSink(redirectedOutputsInputs, unmatched);
604611
if (unmatched.size() != redirectedOutputsInputs.size()) {
@@ -610,7 +617,7 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
610617
} else if (forwardingDestination != "drop") {
611618
throw runtime_error_f("Unknown forwarding destination %s", forwardingDestination.c_str());
612619
}
613-
if (unmatched.size() > 0 || redirectedOutputsInputs.size() > 0) {
620+
if ((unmatched.size() > 0) || (redirectedOutputsInputs.size() > 0) || (tfnsource != workflow.end())) {
614621
std::vector<InputSpec> ignored = unmatched;
615622
ignored.insert(ignored.end(), redirectedOutputsInputs.begin(), redirectedOutputsInputs.end());
616623
for (auto& ignoredInput : ignored) {
@@ -619,8 +626,11 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
619626

620627
// Use the new dummy sink when the AOD reader is there
621628
O2_SIGNPOST_ID_GENERATE(sid, workflow_helpers);
622-
if (aodReader.outputs.empty() == false) {
629+
if (tfnsource != workflow.end()) {
623630
O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "injectServiceDevices", "Injecting scheduled dummy sink");
631+
// if there is a tfnsource, make sure the sink gets TFN/TFF
632+
ignored.emplace_back("tfn", "TFN", "TFNumber");
633+
ignored.emplace_back("tff", "TFF", "TFFilename");
624634
extraSpecs.push_back(CommonDataProcessors::getScheduledDummySink(ignored));
625635
} else {
626636
O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "injectServiceDevices", "Injecting rate limited dummy sink");

0 commit comments

Comments
 (0)