Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Framework/Core/include/Framework/TimesliceIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class TimesliceIndex

/// Find the lowest value for the timeslices in this instance.
/// This is the minimum between all the per channel oldest possible timeslices
/// and the oldest possible timeslice in-fly which is still dirty.
/// and the oldest possible timeslice in flight which is still dirty.
[[nodiscard]] OldestInputInfo getOldestPossibleInput() const;
[[nodiscard]] OldestOutputInfo getOldestPossibleOutput() const;
OldestOutputInfo updateOldestPossibleOutput(bool rewinded);
Expand Down
18 changes: 11 additions & 7 deletions Framework/Core/src/ArrowSupport.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -310,12 +310,12 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
static auto totalMessagesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-messages-destroyed");
static auto totalTimeframesReadMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-timeframes-read");
static auto totalTimeframesConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-timeframes-consumed");
static auto totalTimeframesInFlyMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-timeframes-in-fly");
static auto totalTimeframesInFlightMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-timeframes-in-flight");

static auto totalTimeslicesStartedMetric = createUint64DriverMetric("total-timeslices-started");
static auto totalTimeslicesExpiredMetric = createUint64DriverMetric("total-timeslices-expired");
static auto totalTimeslicesDoneMetric = createUint64DriverMetric("total-timeslices-done");
static auto totalTimeslicesInFlyMetric = createIntDriverMetric("total-timeslices-in-fly");
static auto totalTimeslicesInFlightMetric = createIntDriverMetric("total-timeslices-in-flight");

static auto totalBytesDeltaMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "arrow-bytes-delta");
static auto changedCountMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "changed-metrics-count");
Expand Down Expand Up @@ -457,11 +457,11 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
totalMessagesDestroyedMetric(driverMetrics, totalMessagesDestroyed, timestamp);
totalTimeframesReadMetric(driverMetrics, totalTimeframesRead, timestamp);
totalTimeframesConsumedMetric(driverMetrics, totalTimeframesConsumed, timestamp);
totalTimeframesInFlyMetric(driverMetrics, (int)(totalTimeframesRead - totalTimeframesConsumed), timestamp);
totalTimeframesInFlightMetric(driverMetrics, (int)(totalTimeframesRead - totalTimeframesConsumed), timestamp);
totalTimeslicesStartedMetric(driverMetrics, totalTimeslicesStarted, timestamp);
totalTimeslicesExpiredMetric(driverMetrics, totalTimeslicesExpired, timestamp);
totalTimeslicesDoneMetric(driverMetrics, totalTimeslicesDone, timestamp);
totalTimeslicesInFlyMetric(driverMetrics, (int)(totalTimeslicesStarted - totalTimeslicesDone), timestamp);
totalTimeslicesInFlightMetric(driverMetrics, (int)(totalTimeslicesStarted - totalTimeslicesDone), timestamp);
totalBytesDeltaMetric(driverMetrics, totalBytesCreated - totalBytesExpired - totalBytesDestroyed, timestamp);
} else {
unchangedCount++;
Expand Down Expand Up @@ -696,10 +696,14 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
workflow.erase(reader);
} else {
// load reader algorithm before deployment
auto mctracks2aod = std::find_if(workflow.begin(), workflow.end(), [](auto const& x) { return x.name == "mctracks-to-aod"; });
if (mctracks2aod == workflow.end()) { // add normal reader algorithm only if no on-the-fly generator is injected
auto tfnsource = std::ranges::find_if(workflow, [](DataProcessorSpec const& spec) {
return !spec.name.starts_with("internal-dpl-aod-reader") && std::ranges::any_of(spec.outputs, [](OutputSpec const& output) {
return DataSpecUtils::match(output, "TFN", "TFNumber", 0);
});
});
if (tfnsource == workflow.end()) { // add normal reader algorithm only if no on-the-fly generator is injected
reader->algorithm = CommonDataProcessors::wrapWithTimesliceConsumption(PluginManager::loadAlgorithmFromPlugin("O2FrameworkAnalysisSupport", "ROOTFileReader", ctx));
} // otherwise the algorithm was set in injectServiceDevices
} // otherwise the algorithm was already set in injectServiceDevices
}
}

Expand Down
2 changes: 1 addition & 1 deletion Framework/Core/src/DeviceSpecHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1732,7 +1732,7 @@ boost::program_options::options_description DeviceSpecHelpers::getForwardedDevic
("error-on-exit-transition-timeout", bpo::value<bool>()->zero_tokens(), "print error instead of warning when exit transition timer expires") //
("data-processing-timeout", bpo::value<std::string>(), "timeout after which only calibration can happen") //
("expected-region-callbacks", bpo::value<std::string>(), "region callbacks to expect before starting") //
("timeframes-rate-limit", bpo::value<std::string>()->default_value("0"), "how many timeframes can be in fly") //
("timeframes-rate-limit", bpo::value<std::string>()->default_value("0"), "how many timeframes can be in flight") //
("shm-monitor", bpo::value<std::string>(), "whether to use the shared memory monitor") //
("channel-prefix", bpo::value<std::string>()->default_value(""), "prefix to use for multiplexing multiple workflows in the same session") //
("bad-alloc-max-attempts", bpo::value<std::string>()->default_value("1"), "throw after n attempts to alloc shm") //
Expand Down
29 changes: 24 additions & 5 deletions Framework/Core/src/WorkflowHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -456,15 +456,20 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
}
}

auto tfnsource = std::ranges::find_if(workflow, [](DataProcessorSpec const& spec) {
return std::ranges::any_of(spec.outputs, [](OutputSpec const& output) {
return DataSpecUtils::match(output, "TFN", "TFNumber", 0);
});
});

// add the reader
if (aodReader.outputs.empty() == false) {
auto mctracks2aod = std::ranges::find_if(workflow, [](auto const& x) { return x.name == "mctracks-to-aod"; });
if (mctracks2aod == workflow.end()) {
if (tfnsource == workflow.end()) {
// add normal reader
aodReader.outputs.emplace_back(OutputSpec{"TFN", "TFNumber"});
aodReader.outputs.emplace_back(OutputSpec{"TFF", "TFFilename"});
} else {
// AODs are being injected on-the-fly, add error-handler reader
// AODs are being injected the tfnsource is the entry point, add error-handler reader
aodReader.algorithm = AlgorithmSpec{
adaptStateful(
[](DeviceSpec const& spec) {
Expand Down Expand Up @@ -595,6 +600,12 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext

std::vector<InputSpec> unmatched;
auto forwardingDestination = ctx.options().get<std::string>("forwarding-destination");
// update tfnsource iterator (could be aod-reader)
tfnsource = std::ranges::find_if(workflow, [](DataProcessorSpec const& spec) {
return std::ranges::any_of(spec.outputs, [](OutputSpec const& output) {
return DataSpecUtils::match(output, "TFN", "TFNumber", 0);
});
});
if (redirectedOutputsInputs.size() > 0 && forwardingDestination == "file") {
auto fileSink = CommonDataProcessors::getGlobalFileSink(redirectedOutputsInputs, unmatched);
if (unmatched.size() != redirectedOutputsInputs.size()) {
Expand All @@ -606,7 +617,7 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
} else if (forwardingDestination != "drop") {
throw runtime_error_f("Unknown forwarding destination %s", forwardingDestination.c_str());
}
if (unmatched.size() > 0 || redirectedOutputsInputs.size() > 0) {
if ((unmatched.size() > 0) || (redirectedOutputsInputs.size() > 0) || (tfnsource != workflow.end())) {
std::vector<InputSpec> ignored = unmatched;
ignored.insert(ignored.end(), redirectedOutputsInputs.begin(), redirectedOutputsInputs.end());
for (auto& ignoredInput : ignored) {
Expand All @@ -615,8 +626,11 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext

// Use the new dummy sink when the AOD reader is there
O2_SIGNPOST_ID_GENERATE(sid, workflow_helpers);
if (aodReader.outputs.empty() == false) {
if (tfnsource != workflow.end()) {
O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "injectServiceDevices", "Injecting scheduled dummy sink");
// if there is a tfnsource, make sure the sink gets TFN/TFF
DataSpecUtils::updateInputList(ignored, InputSpec{"tfn", "TFN", "TFNumber", 0, Lifetime::Sporadic});
DataSpecUtils::updateInputList(ignored, InputSpec{"tff", "TFF", "TFFilename", 0, Lifetime::Sporadic});
extraSpecs.push_back(CommonDataProcessors::getScheduledDummySink(ignored));
} else {
O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "injectServiceDevices", "Injecting rate limited dummy sink");
Expand Down Expand Up @@ -742,6 +756,11 @@ void WorkflowHelpers::injectAODWriter(WorkflowSpec& workflow, ConfigContext cons
return DataSpecUtils::partialMatch(spec, o2::header::DataOrigin("TFN"));
});
dec.isDangling[std::distance(dec.outputsInputs.begin(), it)] = false;

it = std::find_if(dec.outputsInputs.begin(), dec.outputsInputs.end(), [](InputSpec const& spec) -> bool {
return DataSpecUtils::partialMatch(spec, o2::header::DataOrigin("TFF"));
});
dec.isDangling[std::distance(dec.outputsInputs.begin(), it)] = false;
}
}

Expand Down
2 changes: 1 addition & 1 deletion Framework/Core/src/runDataProcessing.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1063,7 +1063,7 @@ int doChild(int argc, char** argv, ServiceRegistry& serviceRegistry,
("exit-transition-timeout", bpo::value<std::string>()->default_value(defaultExitTransitionTimeout), "how many second to wait before switching from RUN to READY") //
("error-on-exit-transition-timeout", bpo::value<bool>()->zero_tokens()->default_value(false), "print error instead of warning when exit transition timer expires") //
("data-processing-timeout", bpo::value<std::string>()->default_value(defaultDataProcessingTimeout), "how many second to wait before stopping data processing and allowing data calibration") //
("timeframes-rate-limit", bpo::value<std::string>()->default_value("0"), "how many timeframe can be in fly at the same moment (0 disables)") //
("timeframes-rate-limit", bpo::value<std::string>()->default_value("0"), "how many timeframe can be in flight at the same moment (0 disables)") //
("configuration,cfg", bpo::value<std::string>()->default_value("command-line"), "configuration backend") //
("infologger-mode", bpo::value<std::string>()->default_value(defaultInfologgerMode), "O2_INFOLOGGER_MODE override");
r.fConfig.AddToCmdLineOptions(optsDesc, true);
Expand Down
Loading