Skip to content

Commit 9c5fda9

Browse files
authored
DPL Analysis: better detection for injected workflows (#15312)
1 parent e31bd4d commit 9c5fda9

8 files changed

Lines changed: 94 additions & 68 deletions

File tree

Framework/Core/include/Framework/TimesliceIndex.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ class TimesliceIndex
126126

127127
/// Find the lowest value for the timeslices in this instance.
128128
/// This is the minimum between all the per channel oldest possible timeslices
129-
/// and the oldest possible timeslice in-fly which is still dirty.
129+
/// and the oldest possible timeslice in flight which is still dirty.
130130
[[nodiscard]] OldestInputInfo getOldestPossibleInput() const;
131131
[[nodiscard]] OldestOutputInfo getOldestPossibleOutput() const;
132132
OldestOutputInfo updateOldestPossibleOutput(bool rewinded);

Framework/Core/src/ArrowSupport.cxx

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -310,12 +310,12 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
310310
static auto totalMessagesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-messages-destroyed");
311311
static auto totalTimeframesReadMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-timeframes-read");
312312
static auto totalTimeframesConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-timeframes-consumed");
313-
static auto totalTimeframesInFlyMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-timeframes-in-fly");
313+
static auto totalTimeframesInFlightMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-timeframes-in-flight");
314314

315315
static auto totalTimeslicesStartedMetric = createUint64DriverMetric("total-timeslices-started");
316316
static auto totalTimeslicesExpiredMetric = createUint64DriverMetric("total-timeslices-expired");
317317
static auto totalTimeslicesDoneMetric = createUint64DriverMetric("total-timeslices-done");
318-
static auto totalTimeslicesInFlyMetric = createIntDriverMetric("total-timeslices-in-fly");
318+
static auto totalTimeslicesInFlightMetric = createIntDriverMetric("total-timeslices-in-flight");
319319

320320
static auto totalBytesDeltaMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "arrow-bytes-delta");
321321
static auto changedCountMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "changed-metrics-count");
@@ -457,11 +457,11 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
457457
totalMessagesDestroyedMetric(driverMetrics, totalMessagesDestroyed, timestamp);
458458
totalTimeframesReadMetric(driverMetrics, totalTimeframesRead, timestamp);
459459
totalTimeframesConsumedMetric(driverMetrics, totalTimeframesConsumed, timestamp);
460-
totalTimeframesInFlyMetric(driverMetrics, (int)(totalTimeframesRead - totalTimeframesConsumed), timestamp);
460+
totalTimeframesInFlightMetric(driverMetrics, (int)(totalTimeframesRead - totalTimeframesConsumed), timestamp);
461461
totalTimeslicesStartedMetric(driverMetrics, totalTimeslicesStarted, timestamp);
462462
totalTimeslicesExpiredMetric(driverMetrics, totalTimeslicesExpired, timestamp);
463463
totalTimeslicesDoneMetric(driverMetrics, totalTimeslicesDone, timestamp);
464-
totalTimeslicesInFlyMetric(driverMetrics, (int)(totalTimeslicesStarted - totalTimeslicesDone), timestamp);
464+
totalTimeslicesInFlightMetric(driverMetrics, (int)(totalTimeslicesStarted - totalTimeslicesDone), timestamp);
465465
totalBytesDeltaMetric(driverMetrics, totalBytesCreated - totalBytesExpired - totalBytesDestroyed, timestamp);
466466
} else {
467467
unchangedCount++;
@@ -696,10 +696,14 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
696696
workflow.erase(reader);
697697
} else {
698698
// load reader algorithm before deployment
699-
auto mctracks2aod = std::find_if(workflow.begin(), workflow.end(), [](auto const& x) { return x.name == "mctracks-to-aod"; });
700-
if (mctracks2aod == workflow.end()) { // add normal reader algorithm only if no on-the-fly generator is injected
699+
auto tfnsource = std::ranges::find_if(workflow, [](DataProcessorSpec const& spec) {
700+
return !spec.name.starts_with("internal-dpl-aod-reader") && std::ranges::any_of(spec.outputs, [](OutputSpec const& output) {
701+
return DataSpecUtils::match(output, "TFN", "TFNumber", 0);
702+
});
703+
});
704+
if (tfnsource == workflow.end()) { // add normal reader algorithm only if no on-the-fly generator is injected
701705
reader->algorithm = CommonDataProcessors::wrapWithTimesliceConsumption(PluginManager::loadAlgorithmFromPlugin("O2FrameworkAnalysisSupport", "ROOTFileReader", ctx));
702-
} // otherwise the algorithm was set in injectServiceDevices
706+
} // otherwise the algorithm was already set in injectServiceDevices
703707
}
704708
}
705709

Framework/Core/src/DeviceSpecHelpers.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1732,7 +1732,7 @@ boost::program_options::options_description DeviceSpecHelpers::getForwardedDevic
17321732
("error-on-exit-transition-timeout", bpo::value<bool>()->zero_tokens(), "print error instead of warning when exit transition timer expires") //
17331733
("data-processing-timeout", bpo::value<std::string>(), "timeout after which only calibration can happen") //
17341734
("expected-region-callbacks", bpo::value<std::string>(), "region callbacks to expect before starting") //
1735-
("timeframes-rate-limit", bpo::value<std::string>()->default_value("0"), "how many timeframes can be in fly") //
1735+
("timeframes-rate-limit", bpo::value<std::string>()->default_value("0"), "how many timeframes can be in flight") //
17361736
("shm-monitor", bpo::value<std::string>(), "whether to use the shared memory monitor") //
17371737
("channel-prefix", bpo::value<std::string>()->default_value(""), "prefix to use for multiplexing multiple workflows in the same session") //
17381738
("bad-alloc-max-attempts", bpo::value<std::string>()->default_value("1"), "throw after n attempts to alloc shm") //

Framework/Core/src/WorkflowHelpers.cxx

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -456,15 +456,20 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
456456
}
457457
}
458458

459+
auto tfnsource = std::ranges::find_if(workflow, [](DataProcessorSpec const& spec) {
460+
return std::ranges::any_of(spec.outputs, [](OutputSpec const& output) {
461+
return DataSpecUtils::match(output, "TFN", "TFNumber", 0);
462+
});
463+
});
464+
459465
// add the reader
460466
if (aodReader.outputs.empty() == false) {
461-
auto mctracks2aod = std::ranges::find_if(workflow, [](auto const& x) { return x.name == "mctracks-to-aod"; });
462-
if (mctracks2aod == workflow.end()) {
467+
if (tfnsource == workflow.end()) {
463468
// add normal reader
464469
aodReader.outputs.emplace_back(OutputSpec{"TFN", "TFNumber"});
465470
aodReader.outputs.emplace_back(OutputSpec{"TFF", "TFFilename"});
466471
} else {
467-
// AODs are being injected on-the-fly, add error-handler reader
472+
// AODs are being injected the tfnsource is the entry point, add error-handler reader
468473
aodReader.algorithm = AlgorithmSpec{
469474
adaptStateful(
470475
[](DeviceSpec const& spec) {
@@ -595,6 +600,12 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
595600

596601
std::vector<InputSpec> unmatched;
597602
auto forwardingDestination = ctx.options().get<std::string>("forwarding-destination");
603+
// update tfnsource iterator (could be aod-reader)
604+
tfnsource = std::ranges::find_if(workflow, [](DataProcessorSpec const& spec) {
605+
return std::ranges::any_of(spec.outputs, [](OutputSpec const& output) {
606+
return DataSpecUtils::match(output, "TFN", "TFNumber", 0);
607+
});
608+
});
598609
if (redirectedOutputsInputs.size() > 0 && forwardingDestination == "file") {
599610
auto fileSink = CommonDataProcessors::getGlobalFileSink(redirectedOutputsInputs, unmatched);
600611
if (unmatched.size() != redirectedOutputsInputs.size()) {
@@ -606,7 +617,7 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
606617
} else if (forwardingDestination != "drop") {
607618
throw runtime_error_f("Unknown forwarding destination %s", forwardingDestination.c_str());
608619
}
609-
if (unmatched.size() > 0 || redirectedOutputsInputs.size() > 0) {
620+
if ((unmatched.size() > 0) || (redirectedOutputsInputs.size() > 0) || (tfnsource != workflow.end())) {
610621
std::vector<InputSpec> ignored = unmatched;
611622
ignored.insert(ignored.end(), redirectedOutputsInputs.begin(), redirectedOutputsInputs.end());
612623
for (auto& ignoredInput : ignored) {
@@ -615,8 +626,11 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
615626

616627
// Use the new dummy sink when the AOD reader is there
617628
O2_SIGNPOST_ID_GENERATE(sid, workflow_helpers);
618-
if (aodReader.outputs.empty() == false) {
629+
if (tfnsource != workflow.end()) {
619630
O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "injectServiceDevices", "Injecting scheduled dummy sink");
631+
// if there is a tfnsource, make sure the sink gets TFN/TFF
632+
DataSpecUtils::updateInputList(ignored, InputSpec{"tfn", "TFN", "TFNumber", 0, Lifetime::Sporadic});
633+
DataSpecUtils::updateInputList(ignored, InputSpec{"tff", "TFF", "TFFilename", 0, Lifetime::Sporadic});
620634
extraSpecs.push_back(CommonDataProcessors::getScheduledDummySink(ignored));
621635
} else {
622636
O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "injectServiceDevices", "Injecting rate limited dummy sink");
@@ -742,6 +756,11 @@ void WorkflowHelpers::injectAODWriter(WorkflowSpec& workflow, ConfigContext cons
742756
return DataSpecUtils::partialMatch(spec, o2::header::DataOrigin("TFN"));
743757
});
744758
dec.isDangling[std::distance(dec.outputsInputs.begin(), it)] = false;
759+
760+
it = std::find_if(dec.outputsInputs.begin(), dec.outputsInputs.end(), [](InputSpec const& spec) -> bool {
761+
return DataSpecUtils::partialMatch(spec, o2::header::DataOrigin("TFF"));
762+
});
763+
dec.isDangling[std::distance(dec.outputsInputs.begin(), it)] = false;
745764
}
746765
}
747766

Framework/Core/src/runDataProcessing.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1063,7 +1063,7 @@ int doChild(int argc, char** argv, ServiceRegistry& serviceRegistry,
10631063
("exit-transition-timeout", bpo::value<std::string>()->default_value(defaultExitTransitionTimeout), "how many second to wait before switching from RUN to READY") //
10641064
("error-on-exit-transition-timeout", bpo::value<bool>()->zero_tokens()->default_value(false), "print error instead of warning when exit transition timer expires") //
10651065
("data-processing-timeout", bpo::value<std::string>()->default_value(defaultDataProcessingTimeout), "how many second to wait before stopping data processing and allowing data calibration") //
1066-
("timeframes-rate-limit", bpo::value<std::string>()->default_value("0"), "how many timeframe can be in fly at the same moment (0 disables)") //
1066+
("timeframes-rate-limit", bpo::value<std::string>()->default_value("0"), "how many timeframe can be in flight at the same moment (0 disables)") //
10671067
("configuration,cfg", bpo::value<std::string>()->default_value("command-line"), "configuration backend") //
10681068
("infologger-mode", bpo::value<std::string>()->default_value(defaultInfologgerMode), "O2_INFOLOGGER_MODE override");
10691069
r.fConfig.AddToCmdLineOptions(optsDesc, true);

0 commit comments

Comments
 (0)