From c66faec60ff147a4545c3b4585891ed4ca2da757 Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Fri, 13 Mar 2026 10:40:27 +0100 Subject: [PATCH 1/7] detect tfnsource rather than the converter --- Framework/Core/src/ArrowSupport.cxx | 8 ++++++-- Framework/Core/src/WorkflowHelpers.cxx | 15 ++++++++++++--- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/Framework/Core/src/ArrowSupport.cxx b/Framework/Core/src/ArrowSupport.cxx index 1819307e26806..dd09303dd5d0d 100644 --- a/Framework/Core/src/ArrowSupport.cxx +++ b/Framework/Core/src/ArrowSupport.cxx @@ -696,8 +696,12 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() workflow.erase(reader); } else { // load reader algorithm before deployment - auto mctracks2aod = std::find_if(workflow.begin(), workflow.end(), [](auto const& x) { return x.name == "mctracks-to-aod"; }); - if (mctracks2aod == workflow.end()) { // add normal reader algorithm only if no on-the-fly generator is injected + auto tfnsource = std::ranges::find_if(workflow, [](DataProcessorSpec const& spec) { + return !spec.name.starts_with("internal-dpl-aod-reader") && std::ranges::any_of(spec.outputs, [](OutputSpec const& output) { + return DataSpecUtils::match(output, "TFN", "TFNumber", 0); + }); + }); + if (tfnsource == workflow.end()) { // add normal reader algorithm only if no on-the-fly generator is injected reader->algorithm = CommonDataProcessors::wrapWithTimesliceConsumption(PluginManager::loadAlgorithmFromPlugin("O2FrameworkAnalysisSupport", "ROOTFileReader", ctx)); } // otherwise the algorithm was set in injectServiceDevices } diff --git a/Framework/Core/src/WorkflowHelpers.cxx b/Framework/Core/src/WorkflowHelpers.cxx index a97fcf2897abd..a01916c9b3217 100644 --- a/Framework/Core/src/WorkflowHelpers.cxx +++ b/Framework/Core/src/WorkflowHelpers.cxx @@ -458,13 +458,17 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext // add the reader if (aodReader.outputs.empty() == false) { - auto mctracks2aod = std::ranges::find_if(workflow, [](auto const& x) { return x.name == "mctracks-to-aod"; }); - if (mctracks2aod == workflow.end()) { + auto tfnsource = std::ranges::find_if(workflow, [](DataProcessorSpec const& spec) { + return std::ranges::any_of(spec.outputs, [](OutputSpec const& output) { + return DataSpecUtils::match(output, "TFN", "TFNumber", 0); + }); + }); + if (tfnsource == workflow.end()) { // add normal reader aodReader.outputs.emplace_back(OutputSpec{"TFN", "TFNumber"}); aodReader.outputs.emplace_back(OutputSpec{"TFF", "TFFilename"}); } else { - // AODs are being injected on-the-fly, add error-handler reader + // AODs are being injected the tfnsource is the entry point, add error-handler reader aodReader.algorithm = AlgorithmSpec{ adaptStateful( [](DeviceSpec const& spec) { @@ -742,6 +746,11 @@ void WorkflowHelpers::injectAODWriter(WorkflowSpec& workflow, ConfigContext cons return DataSpecUtils::partialMatch(spec, o2::header::DataOrigin("TFN")); }); dec.isDangling[std::distance(dec.outputsInputs.begin(), it)] = false; + + it = std::find_if(dec.outputsInputs.begin(), dec.outputsInputs.end(), [](InputSpec const& spec) -> bool { + return DataSpecUtils::partialMatch(spec, o2::header::DataOrigin("TFF")); + }); + dec.isDangling[std::distance(dec.outputsInputs.begin(), it)] = false; } } From f7eb5d502154bea75a35f517434853e067561fdc Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Tue, 31 Mar 2026 09:33:00 +0200 Subject: [PATCH 2/7] explicitly subscribe dummy sink to TFN&TFF --- Framework/Core/src/ArrowSupport.cxx | 2 +- Framework/Core/src/WorkflowHelpers.cxx | 24 +++++++++++++++++------- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/Framework/Core/src/ArrowSupport.cxx b/Framework/Core/src/ArrowSupport.cxx index dd09303dd5d0d..0e63c7ab87496 100644 --- a/Framework/Core/src/ArrowSupport.cxx +++ b/Framework/Core/src/ArrowSupport.cxx @@ -703,7 +703,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() }); if (tfnsource == workflow.end()) { // add normal reader algorithm only if no on-the-fly generator is injected reader->algorithm = CommonDataProcessors::wrapWithTimesliceConsumption(PluginManager::loadAlgorithmFromPlugin("O2FrameworkAnalysisSupport", "ROOTFileReader", ctx)); - } // otherwise the algorithm was set in injectServiceDevices + } // otherwise the algorithm was already set in injectServiceDevices } } diff --git a/Framework/Core/src/WorkflowHelpers.cxx b/Framework/Core/src/WorkflowHelpers.cxx index a01916c9b3217..619acfa06253c 100644 --- a/Framework/Core/src/WorkflowHelpers.cxx +++ b/Framework/Core/src/WorkflowHelpers.cxx @@ -456,13 +456,14 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext } } + auto tfnsource = std::ranges::find_if(workflow, [](DataProcessorSpec const& spec) { + return std::ranges::any_of(spec.outputs, [](OutputSpec const& output) { + return DataSpecUtils::match(output, "TFN", "TFNumber", 0); + }); + }); + // add the reader if (aodReader.outputs.empty() == false) { - auto tfnsource = std::ranges::find_if(workflow, [](DataProcessorSpec const& spec) { - return std::ranges::any_of(spec.outputs, [](OutputSpec const& output) { - return DataSpecUtils::match(output, "TFN", "TFNumber", 0); - }); - }); if (tfnsource == workflow.end()) { // add normal reader aodReader.outputs.emplace_back(OutputSpec{"TFN", "TFNumber"}); @@ -599,6 +600,12 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext std::vector unmatched; auto forwardingDestination = ctx.options().get("forwarding-destination"); + // update tfnsource iterator (could be aod-reader) + tfnsource = std::ranges::find_if(workflow, [](DataProcessorSpec const& spec) { + return std::ranges::any_of(spec.outputs, [](OutputSpec const& output) { + return DataSpecUtils::match(output, "TFN", "TFNumber", 0); + }); + }); if (redirectedOutputsInputs.size() > 0 && forwardingDestination == "file") { auto fileSink = CommonDataProcessors::getGlobalFileSink(redirectedOutputsInputs, unmatched); if (unmatched.size() != redirectedOutputsInputs.size()) { @@ -610,7 +617,7 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext } else if (forwardingDestination != "drop") { throw runtime_error_f("Unknown forwarding destination %s", forwardingDestination.c_str()); } - if (unmatched.size() > 0 || redirectedOutputsInputs.size() > 0) { + if ((unmatched.size() > 0) || (redirectedOutputsInputs.size() > 0) || (tfnsource != workflow.end())) { std::vector ignored = unmatched; ignored.insert(ignored.end(), redirectedOutputsInputs.begin(), redirectedOutputsInputs.end()); for (auto& ignoredInput : ignored) { @@ -619,8 +626,11 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext // Use the new dummy sink when the AOD reader is there O2_SIGNPOST_ID_GENERATE(sid, workflow_helpers); - if (aodReader.outputs.empty() == false) { + if (tfnsource != workflow.end()) { O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "injectServiceDevices", "Injecting scheduled dummy sink"); + // if there is a tfnsource, make sure the sink gets TFN/TFF + ignored.emplace_back("tfn", "TFN", "TFNumber"); + ignored.emplace_back("tff", "TFF", "TFFilename"); extraSpecs.push_back(CommonDataProcessors::getScheduledDummySink(ignored)); } else { O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "injectServiceDevices", "Injecting rate limited dummy sink"); From 8b65b88da3d36dacd1c38972e9fdd49e3f1e6e95 Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Mon, 30 Mar 2026 11:26:02 +0200 Subject: [PATCH 3/7] fix typo --- Framework/Core/include/Framework/TimesliceIndex.h | 2 +- Framework/Core/src/ArrowSupport.cxx | 8 ++++---- Framework/Core/src/DeviceSpecHelpers.cxx | 2 +- Framework/Core/src/runDataProcessing.cxx | 2 +- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/Framework/Core/include/Framework/TimesliceIndex.h b/Framework/Core/include/Framework/TimesliceIndex.h index ac3970bec00ee..ea612f701152c 100644 --- a/Framework/Core/include/Framework/TimesliceIndex.h +++ b/Framework/Core/include/Framework/TimesliceIndex.h @@ -126,7 +126,7 @@ class TimesliceIndex /// Find the lowest value for the timeslices in this instance. /// This is the minimum between all the per channel oldest possible timeslices - /// and the oldest possible timeslice in-fly which is still dirty. + /// and the oldest possible timeslice in flight which is still dirty. [[nodiscard]] OldestInputInfo getOldestPossibleInput() const; [[nodiscard]] OldestOutputInfo getOldestPossibleOutput() const; OldestOutputInfo updateOldestPossibleOutput(bool rewinded); diff --git a/Framework/Core/src/ArrowSupport.cxx b/Framework/Core/src/ArrowSupport.cxx index 0e63c7ab87496..780c836437c2b 100644 --- a/Framework/Core/src/ArrowSupport.cxx +++ b/Framework/Core/src/ArrowSupport.cxx @@ -310,12 +310,12 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() static auto totalMessagesDestroyedMetric = DeviceMetricsHelper::createNumericMetric(driverMetrics, "total-arrow-messages-destroyed"); static auto totalTimeframesReadMetric = DeviceMetricsHelper::createNumericMetric(driverMetrics, "total-timeframes-read"); static auto totalTimeframesConsumedMetric = DeviceMetricsHelper::createNumericMetric(driverMetrics, "total-timeframes-consumed"); - static auto totalTimeframesInFlyMetric = DeviceMetricsHelper::createNumericMetric(driverMetrics, "total-timeframes-in-fly"); + static auto totalTimeframesInFlightMetric = DeviceMetricsHelper::createNumericMetric(driverMetrics, "total-timeframes-in-flight"); static auto totalTimeslicesStartedMetric = createUint64DriverMetric("total-timeslices-started"); static auto totalTimeslicesExpiredMetric = createUint64DriverMetric("total-timeslices-expired"); static auto totalTimeslicesDoneMetric = createUint64DriverMetric("total-timeslices-done"); - static auto totalTimeslicesInFlyMetric = createIntDriverMetric("total-timeslices-in-fly"); + static auto totalTimeslicesInFlightMetric = createIntDriverMetric("total-timeslices-in-flight"); static auto totalBytesDeltaMetric = DeviceMetricsHelper::createNumericMetric(driverMetrics, "arrow-bytes-delta"); static auto changedCountMetric = DeviceMetricsHelper::createNumericMetric(driverMetrics, "changed-metrics-count"); @@ -457,11 +457,11 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() totalMessagesDestroyedMetric(driverMetrics, totalMessagesDestroyed, timestamp); totalTimeframesReadMetric(driverMetrics, totalTimeframesRead, timestamp); totalTimeframesConsumedMetric(driverMetrics, totalTimeframesConsumed, timestamp); - totalTimeframesInFlyMetric(driverMetrics, (int)(totalTimeframesRead - totalTimeframesConsumed), timestamp); + totalTimeframesInFlightMetric(driverMetrics, (int)(totalTimeframesRead - totalTimeframesConsumed), timestamp); totalTimeslicesStartedMetric(driverMetrics, totalTimeslicesStarted, timestamp); totalTimeslicesExpiredMetric(driverMetrics, totalTimeslicesExpired, timestamp); totalTimeslicesDoneMetric(driverMetrics, totalTimeslicesDone, timestamp); - totalTimeslicesInFlyMetric(driverMetrics, (int)(totalTimeslicesStarted - totalTimeslicesDone), timestamp); + totalTimeslicesInFlightMetric(driverMetrics, (int)(totalTimeslicesStarted - totalTimeslicesDone), timestamp); totalBytesDeltaMetric(driverMetrics, totalBytesCreated - totalBytesExpired - totalBytesDestroyed, timestamp); } else { unchangedCount++; diff --git a/Framework/Core/src/DeviceSpecHelpers.cxx b/Framework/Core/src/DeviceSpecHelpers.cxx index 88e5269482ebd..49ee83e5aadfe 100644 --- a/Framework/Core/src/DeviceSpecHelpers.cxx +++ b/Framework/Core/src/DeviceSpecHelpers.cxx @@ -1732,7 +1732,7 @@ boost::program_options::options_description DeviceSpecHelpers::getForwardedDevic ("error-on-exit-transition-timeout", bpo::value()->zero_tokens(), "print error instead of warning when exit transition timer expires") // ("data-processing-timeout", bpo::value(), "timeout after which only calibration can happen") // ("expected-region-callbacks", bpo::value(), "region callbacks to expect before starting") // - ("timeframes-rate-limit", bpo::value()->default_value("0"), "how many timeframes can be in fly") // + ("timeframes-rate-limit", bpo::value()->default_value("0"), "how many timeframes can be in flight") // ("shm-monitor", bpo::value(), "whether to use the shared memory monitor") // ("channel-prefix", bpo::value()->default_value(""), "prefix to use for multiplexing multiple workflows in the same session") // ("bad-alloc-max-attempts", bpo::value()->default_value("1"), "throw after n attempts to alloc shm") // diff --git a/Framework/Core/src/runDataProcessing.cxx b/Framework/Core/src/runDataProcessing.cxx index 03cef9a034144..41b49516b155b 100644 --- a/Framework/Core/src/runDataProcessing.cxx +++ b/Framework/Core/src/runDataProcessing.cxx @@ -1063,7 +1063,7 @@ int doChild(int argc, char** argv, ServiceRegistry& serviceRegistry, ("exit-transition-timeout", bpo::value()->default_value(defaultExitTransitionTimeout), "how many second to wait before switching from RUN to READY") // ("error-on-exit-transition-timeout", bpo::value()->zero_tokens()->default_value(false), "print error instead of warning when exit transition timer expires") // ("data-processing-timeout", bpo::value()->default_value(defaultDataProcessingTimeout), "how many second to wait before stopping data processing and allowing data calibration") // - ("timeframes-rate-limit", bpo::value()->default_value("0"), "how many timeframe can be in fly at the same moment (0 disables)") // + ("timeframes-rate-limit", bpo::value()->default_value("0"), "how many timeframe can be in flight at the same moment (0 disables)") // ("configuration,cfg", bpo::value()->default_value("command-line"), "configuration backend") // ("infologger-mode", bpo::value()->default_value(defaultInfologgerMode), "O2_INFOLOGGER_MODE override"); r.fConfig.AddToCmdLineOptions(optsDesc, true); From 3ea925e9b3604b8433bfe16cf4f7a09b4c49ed18 Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Fri, 13 Mar 2026 10:51:26 +0100 Subject: [PATCH 4/7] fix and update kine- and hepmc-publishers --- run/o2sim_hepmc_publisher.cxx | 94 ++++++++++++++++++----------------- run/o2sim_kine_publisher.cxx | 3 +- 2 files changed, 50 insertions(+), 47 deletions(-) diff --git a/run/o2sim_hepmc_publisher.cxx b/run/o2sim_hepmc_publisher.cxx index bf40abacb134f..f255b4a3a4f62 100644 --- a/run/o2sim_hepmc_publisher.cxx +++ b/run/o2sim_hepmc_publisher.cxx @@ -37,7 +37,9 @@ struct O2simHepmcPublisher { int tfCounter = 0; std::shared_ptr hepMCReader; bool eos = false; - std::vector mcTracks; + + std::vector*> mctracks_vector; + std::vector mcheader_vector; void init(o2::framework::InitContext& /*ic*/) { @@ -50,13 +52,19 @@ struct O2simHepmcPublisher { LOGP(fatal, "Cannot open HEPMC kine file {}", (std::string)hepmcFileName); } // allocate the memory upfront to prevent reallocations later - mcTracks.reserve(1e3 * aggregate); + mctracks_vector.reserve(aggregate); + mcheader_vector.reserve(aggregate); } void run(o2::framework::ProcessingContext& pc) { HepMC3::GenEvent event; - for (auto i = 0; i < (int)aggregate; ++i) { + auto batch = maxEvents > 0 ? std::min((int)aggregate, (int)maxEvents - eventCounter) : (int)aggregate; + for (auto i = 0; i < batch; ++i) { + mctracks_vector.push_back(&pc.outputs().make>(Output{"MC", "MCTRACKS", 0})); + auto& mctracks = mctracks_vector.back(); + mcheader_vector.push_back(&pc.outputs().make(Output{"MC", "MCHEADER", 0})); + auto& mcheader = mcheader_vector.back(); // read next entry hepMCReader->read_event(event); if (hepMCReader->failed()) { @@ -66,61 +74,60 @@ struct O2simHepmcPublisher { } // create O2 MCHeader and MCtracks vector out of HEPMC event - o2::dataformats::MCEventHeader mcHeader; - mcHeader.SetEventID(event.event_number()); - mcHeader.SetVertex(event.event_pos().px(), event.event_pos().py(), event.event_pos().pz()); + mcheader->SetEventID(event.event_number()); + mcheader->SetVertex(event.event_pos().px(), event.event_pos().py(), event.event_pos().pz()); auto xsecInfo = event.cross_section(); if (xsecInfo != nullptr) { - mcHeader.putInfo(MCInfoKeys::acceptedEvents, (uint64_t)xsecInfo->get_accepted_events()); - mcHeader.putInfo(MCInfoKeys::attemptedEvents, (uint64_t)xsecInfo->get_attempted_events()); - mcHeader.putInfo(MCInfoKeys::xSection, (float)xsecInfo->xsec()); - mcHeader.putInfo(MCInfoKeys::xSectionError, (float)xsecInfo->xsec_err()); + mcheader->putInfo(MCInfoKeys::acceptedEvents, (uint64_t)xsecInfo->get_accepted_events()); + mcheader->putInfo(MCInfoKeys::attemptedEvents, (uint64_t)xsecInfo->get_attempted_events()); + mcheader->putInfo(MCInfoKeys::xSection, (float)xsecInfo->xsec()); + mcheader->putInfo(MCInfoKeys::xSectionError, (float)xsecInfo->xsec_err()); } auto scale = event.attribute(MCInfoKeys::eventScale); if (scale != nullptr) { - mcHeader.putInfo(MCInfoKeys::eventScale, (float)scale->value()); + mcheader->putInfo(MCInfoKeys::eventScale, (float)scale->value()); } auto nMPI = event.attribute(MCInfoKeys::mpi); if (nMPI != nullptr) { - mcHeader.putInfo(MCInfoKeys::mpi, nMPI->value()); + mcheader->putInfo(MCInfoKeys::mpi, nMPI->value()); } auto sid = event.attribute(MCInfoKeys::processCode); auto scode = event.attribute(MCInfoKeys::processID); // default pythia8 hepmc3 interface uses signal_process_id if (sid != nullptr) { - mcHeader.putInfo(MCInfoKeys::processCode, sid->value()); + mcheader->putInfo(MCInfoKeys::processCode, sid->value()); } else if (scode != nullptr) { - mcHeader.putInfo(MCInfoKeys::processCode, scode->value()); + mcheader->putInfo(MCInfoKeys::processCode, scode->value()); } auto pdfInfo = event.pdf_info(); if (pdfInfo != nullptr) { - mcHeader.putInfo(MCInfoKeys::pdfParton1Id, pdfInfo->parton_id[0]); - mcHeader.putInfo(MCInfoKeys::pdfParton2Id, pdfInfo->parton_id[1]); - mcHeader.putInfo(MCInfoKeys::pdfCode1, pdfInfo->pdf_id[0]); - mcHeader.putInfo(MCInfoKeys::pdfCode2, pdfInfo->pdf_id[1]); - mcHeader.putInfo(MCInfoKeys::pdfX1, (float)pdfInfo->x[0]); - mcHeader.putInfo(MCInfoKeys::pdfX2, (float)pdfInfo->x[1]); - mcHeader.putInfo(MCInfoKeys::pdfScale, (float)pdfInfo->scale); - mcHeader.putInfo(MCInfoKeys::pdfXF1, (float)pdfInfo->xf[0]); - mcHeader.putInfo(MCInfoKeys::pdfXF2, (float)pdfInfo->xf[1]); + mcheader->putInfo(MCInfoKeys::pdfParton1Id, pdfInfo->parton_id[0]); + mcheader->putInfo(MCInfoKeys::pdfParton2Id, pdfInfo->parton_id[1]); + mcheader->putInfo(MCInfoKeys::pdfCode1, pdfInfo->pdf_id[0]); + mcheader->putInfo(MCInfoKeys::pdfCode2, pdfInfo->pdf_id[1]); + mcheader->putInfo(MCInfoKeys::pdfX1, (float)pdfInfo->x[0]); + mcheader->putInfo(MCInfoKeys::pdfX2, (float)pdfInfo->x[1]); + mcheader->putInfo(MCInfoKeys::pdfScale, (float)pdfInfo->scale); + mcheader->putInfo(MCInfoKeys::pdfXF1, (float)pdfInfo->xf[0]); + mcheader->putInfo(MCInfoKeys::pdfXF2, (float)pdfInfo->xf[1]); } auto heavyIon = event.heavy_ion(); if (heavyIon != nullptr) { - mcHeader.putInfo(MCInfoKeys::nCollHard, heavyIon->Ncoll_hard); - mcHeader.putInfo(MCInfoKeys::nPartProjectile, heavyIon->Npart_proj); - mcHeader.putInfo(MCInfoKeys::nPartTarget, heavyIon->Npart_targ); - mcHeader.putInfo(MCInfoKeys::nColl, heavyIon->Ncoll); - mcHeader.putInfo(MCInfoKeys::nCollNNWounded, heavyIon->N_Nwounded_collisions); - mcHeader.putInfo(MCInfoKeys::nCollNWoundedN, heavyIon->Nwounded_N_collisions); - mcHeader.putInfo(MCInfoKeys::nCollNWoundedNwounded, heavyIon->Nwounded_Nwounded_collisions); - mcHeader.putInfo(MCInfoKeys::nSpecProjectileNeutron, heavyIon->Nspec_proj_n); - mcHeader.putInfo(MCInfoKeys::nSpecProjectileProton, heavyIon->Nspec_proj_p); - mcHeader.putInfo(MCInfoKeys::nSpecTargetNeutron, heavyIon->Nspec_targ_n); - mcHeader.putInfo(MCInfoKeys::nSpecTargetProton, heavyIon->Nspec_targ_p); - mcHeader.putInfo(MCInfoKeys::impactParameter, (float)heavyIon->impact_parameter); - mcHeader.putInfo(MCInfoKeys::planeAngle, (float)heavyIon->event_plane_angle); - mcHeader.putInfo("eccentricity", (float)heavyIon->eccentricity); - mcHeader.putInfo(MCInfoKeys::sigmaInelNN, (float)heavyIon->sigma_inel_NN); - mcHeader.putInfo(MCInfoKeys::centrality, (float)heavyIon->centrality); + mcheader->putInfo(MCInfoKeys::nCollHard, heavyIon->Ncoll_hard); + mcheader->putInfo(MCInfoKeys::nPartProjectile, heavyIon->Npart_proj); + mcheader->putInfo(MCInfoKeys::nPartTarget, heavyIon->Npart_targ); + mcheader->putInfo(MCInfoKeys::nColl, heavyIon->Ncoll); + mcheader->putInfo(MCInfoKeys::nCollNNWounded, heavyIon->N_Nwounded_collisions); + mcheader->putInfo(MCInfoKeys::nCollNWoundedN, heavyIon->Nwounded_N_collisions); + mcheader->putInfo(MCInfoKeys::nCollNWoundedNwounded, heavyIon->Nwounded_Nwounded_collisions); + mcheader->putInfo(MCInfoKeys::nSpecProjectileNeutron, heavyIon->Nspec_proj_n); + mcheader->putInfo(MCInfoKeys::nSpecProjectileProton, heavyIon->Nspec_proj_p); + mcheader->putInfo(MCInfoKeys::nSpecTargetNeutron, heavyIon->Nspec_targ_n); + mcheader->putInfo(MCInfoKeys::nSpecTargetProton, heavyIon->Nspec_targ_p); + mcheader->putInfo(MCInfoKeys::impactParameter, (float)heavyIon->impact_parameter); + mcheader->putInfo(MCInfoKeys::planeAngle, (float)heavyIon->event_plane_angle); + mcheader->putInfo("eccentricity", (float)heavyIon->eccentricity); + mcheader->putInfo(MCInfoKeys::sigmaInelNN, (float)heavyIon->sigma_inel_NN); + mcheader->putInfo(MCInfoKeys::centrality, (float)heavyIon->centrality); } auto particles = event.particles(); @@ -131,7 +138,7 @@ struct O2simHepmcPublisher { auto has_children = children.size() > 0; auto p = particle->momentum(); auto v = particle->production_vertex(); - mcTracks.emplace_back( + mctracks->emplace_back( particle->pid(), has_parents ? parents.front()->id() : -1, has_parents ? parents.back()->id() : -1, has_children ? children.front()->id() : -1, has_children ? children.back()->id() : -1, @@ -139,18 +146,13 @@ struct O2simHepmcPublisher { v->position().x(), v->position().y(), v->position().z(), v->position().t(), 0); } - - // add to the message - pc.outputs().snapshot(Output{"MC", "MCHEADER", 0}, mcHeader); - pc.outputs().snapshot(Output{"MC", "MCTRACKS", 0}, mcTracks); - mcTracks.clear(); ++eventCounter; } // report number of TFs injected for the rate limiter to work ++tfCounter; pc.services().get().send(o2::monitoring::Metric{(uint64_t)tfCounter, "df-sent"}.addTag(o2::monitoring::tags::Key::Subsystem, o2::monitoring::tags::Value::DPL)); - if (eos || (maxEvents > 0 && eventCounter == maxEvents)) { + if (eos || (maxEvents > 0 && eventCounter >= maxEvents)) { pc.services().get().endOfStream(); pc.services().get().readyToQuit(QuitRequest::Me); } diff --git a/run/o2sim_kine_publisher.cxx b/run/o2sim_kine_publisher.cxx index cfbea6ae02a5f..5920743c3fafa 100644 --- a/run/o2sim_kine_publisher.cxx +++ b/run/o2sim_kine_publisher.cxx @@ -40,7 +40,8 @@ struct O2simKinePublisher { void run(o2::framework::ProcessingContext& pc) { - for (auto i = 0; i < std::min((int)aggregate, nEvents - eventCounter); ++i) { + auto batch = std::min((int)aggregate, nEvents - eventCounter); + for (auto i = 0; i < batch; ++i) { auto mcevent = mcKinReader->getMCEventHeader(0, eventCounter); auto mctracks = mcKinReader->getTracks(0, eventCounter); pc.outputs().snapshot(Output{"MC", "MCHEADER", 0}, mcevent); From f63e501fab216303175aa972e5d645da9bc55cc1 Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Fri, 13 Mar 2026 10:55:14 +0100 Subject: [PATCH 5/7] use detail instead of debug --- run/o2sim_mctracks_to_aod.cxx | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/run/o2sim_mctracks_to_aod.cxx b/run/o2sim_mctracks_to_aod.cxx index 124e8aa7b3e42..d95a3b33cc38f 100644 --- a/run/o2sim_mctracks_to_aod.cxx +++ b/run/o2sim_mctracks_to_aod.cxx @@ -70,7 +70,7 @@ struct MctracksToAod { /** Run the conversion */ void run(o2::framework::ProcessingContext& pc) { - LOG(debug) << "=== Running extended MC AOD exporter ==="; + LOG(detail) << "=== Running extended MC AOD exporter ==="; using namespace o2::aodmchelpers; using McHeader = o2::dataformats::MCEventHeader; using McTrack = o2::MCTrack; @@ -94,13 +94,13 @@ struct MctracksToAod { // TODO: include BC simulation auto bcCounter = 0UL; size_t offset = 0; - LOG(debug) << "--- Loop over " << nParts << " parts ---"; + LOG(detail) << "--- Loop over " << nParts << " parts ---"; for (auto i = 0U; i < nParts; ++i) { auto record = mSampler.generateCollisionTime(); auto header = pc.inputs().get("mcheader", i); auto tracks = pc.inputs().get("mctracks", i); - LOG(debug) << "Updating collision table"; + LOG(detail) << "Updating collision table"; auto genID = updateMCCollisions(mCollisions.cursor, bcCounter, record.timeInBCNS * 1.e-3, @@ -108,12 +108,12 @@ struct MctracksToAod { 0, i); - LOG(debug) << "Updating HepMC tables"; + LOG(detail) << "Updating HepMC tables"; updateHepMCXSection(mXSections.cursor, bcCounter, genID, *header); updateHepMCPdfInfo(mPdfInfos.cursor, bcCounter, genID, *header); updateHepMCHeavyIon(mHeavyIons.cursor, bcCounter, genID, *header); - LOG(debug) << "Updating particles table"; + LOG(detail) << "Updating particles table"; TrackToIndex preselect; offset = updateParticles(mParticles.cursor, bcCounter, @@ -123,7 +123,7 @@ struct MctracksToAod { (bool)filt, false); - LOG(debug) << "Increment BC counter"; + LOG(detail) << "Increment BC counter"; bcCounter++; } From f578ecdb5611c5c73115557b65db89a26b4afad9 Mon Sep 17 00:00:00 2001 From: ALICE Action Bot Date: Thu, 23 Apr 2026 09:17:54 +0000 Subject: [PATCH 6/7] Please consider the following formatting changes --- Framework/Core/src/DeviceSpecHelpers.cxx | 2 +- Framework/Core/src/runDataProcessing.cxx | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Framework/Core/src/DeviceSpecHelpers.cxx b/Framework/Core/src/DeviceSpecHelpers.cxx index 49ee83e5aadfe..011b3aa12162f 100644 --- a/Framework/Core/src/DeviceSpecHelpers.cxx +++ b/Framework/Core/src/DeviceSpecHelpers.cxx @@ -1732,7 +1732,7 @@ boost::program_options::options_description DeviceSpecHelpers::getForwardedDevic ("error-on-exit-transition-timeout", bpo::value()->zero_tokens(), "print error instead of warning when exit transition timer expires") // ("data-processing-timeout", bpo::value(), "timeout after which only calibration can happen") // ("expected-region-callbacks", bpo::value(), "region callbacks to expect before starting") // - ("timeframes-rate-limit", bpo::value()->default_value("0"), "how many timeframes can be in flight") // + ("timeframes-rate-limit", bpo::value()->default_value("0"), "how many timeframes can be in flight") // ("shm-monitor", bpo::value(), "whether to use the shared memory monitor") // ("channel-prefix", bpo::value()->default_value(""), "prefix to use for multiplexing multiple workflows in the same session") // ("bad-alloc-max-attempts", bpo::value()->default_value("1"), "throw after n attempts to alloc shm") // diff --git a/Framework/Core/src/runDataProcessing.cxx b/Framework/Core/src/runDataProcessing.cxx index 41b49516b155b..d012e1656efc4 100644 --- a/Framework/Core/src/runDataProcessing.cxx +++ b/Framework/Core/src/runDataProcessing.cxx @@ -1063,7 +1063,7 @@ int doChild(int argc, char** argv, ServiceRegistry& serviceRegistry, ("exit-transition-timeout", bpo::value()->default_value(defaultExitTransitionTimeout), "how many second to wait before switching from RUN to READY") // ("error-on-exit-transition-timeout", bpo::value()->zero_tokens()->default_value(false), "print error instead of warning when exit transition timer expires") // ("data-processing-timeout", bpo::value()->default_value(defaultDataProcessingTimeout), "how many second to wait before stopping data processing and allowing data calibration") // - ("timeframes-rate-limit", bpo::value()->default_value("0"), "how many timeframe can be in flight at the same moment (0 disables)") // + ("timeframes-rate-limit", bpo::value()->default_value("0"), "how many timeframe can be in flight at the same moment (0 disables)") // ("configuration,cfg", bpo::value()->default_value("command-line"), "configuration backend") // ("infologger-mode", bpo::value()->default_value(defaultInfologgerMode), "O2_INFOLOGGER_MODE override"); r.fConfig.AddToCmdLineOptions(optsDesc, true); From 8f1bacd9a6cc63f9b715b8d30276ba899cb956dd Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Thu, 23 Apr 2026 12:47:09 +0200 Subject: [PATCH 7/7] update ignored list as sporadic --- Framework/Core/src/WorkflowHelpers.cxx | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Framework/Core/src/WorkflowHelpers.cxx b/Framework/Core/src/WorkflowHelpers.cxx index 619acfa06253c..5f1c1eaee5544 100644 --- a/Framework/Core/src/WorkflowHelpers.cxx +++ b/Framework/Core/src/WorkflowHelpers.cxx @@ -629,8 +629,8 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext if (tfnsource != workflow.end()) { O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "injectServiceDevices", "Injecting scheduled dummy sink"); // if there is a tfnsource, make sure the sink gets TFN/TFF - ignored.emplace_back("tfn", "TFN", "TFNumber"); - ignored.emplace_back("tff", "TFF", "TFFilename"); + DataSpecUtils::updateInputList(ignored, InputSpec{"tfn", "TFN", "TFNumber", 0, Lifetime::Sporadic}); + DataSpecUtils::updateInputList(ignored, InputSpec{"tff", "TFF", "TFFilename", 0, Lifetime::Sporadic}); extraSpecs.push_back(CommonDataProcessors::getScheduledDummySink(ignored)); } else { O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "injectServiceDevices", "Injecting rate limited dummy sink");