diff --git a/Framework/include/QualityControl/Aggregator.h b/Framework/include/QualityControl/Aggregator.h index 128c915d1c..34c5f6dcd6 100644 --- a/Framework/include/QualityControl/Aggregator.h +++ b/Framework/include/QualityControl/Aggregator.h @@ -51,10 +51,6 @@ struct AggregatorSpec; class Aggregator { public: - /// \brief Number of bytes in data description used for hashing of AggregatorRunner names. See HashDataDescription.h for details - static constexpr size_t descriptionHashLength = 4; - static_assert(descriptionHashLength <= o2::header::DataDescription::size); - /// Constructor /** * \brief Aggregator constructor @@ -84,7 +80,6 @@ class Aggregator void endOfActivity(const core::Activity& activity); static AggregatorConfig extractConfig(const core::CommonSpec&, const AggregatorSpec&); - static framework::OutputSpec createOutputSpec(const std::string& detector, const std::string& aggregatorName); private: /** diff --git a/Framework/include/QualityControl/AggregatorRunner.h b/Framework/include/QualityControl/AggregatorRunner.h index a31e4b5c9d..4066100325 100644 --- a/Framework/include/QualityControl/AggregatorRunner.h +++ b/Framework/include/QualityControl/AggregatorRunner.h @@ -79,10 +79,6 @@ struct AggregatorSource; /// \author Barthélémy von Haller class AggregatorRunner : public framework::Task { - /// \brief Number of bytes in data description used for hashing of AggregatorRunner names. See HashDataDescription.h for details - static constexpr size_t descriptionHashLength = 4; - static_assert(descriptionHashLength <= o2::header::DataDescription::size); - public: /// Constructor /** @@ -111,7 +107,6 @@ class AggregatorRunner : public framework::Task static framework::DataProcessorLabel getLabel() { return { "qc-aggregator" }; } static std::string createAggregatorRunnerIdString() { return "qc-aggregator"; }; static std::string createAggregatorRunnerName(); - static header::DataDescription createAggregatorRunnerDataDescription(const std::string& aggregatorName); /// \brief Compute the detector name to be used in the infologger for this runner. /// Compute the detector name to be used in the infologger for this runner. diff --git a/Framework/include/QualityControl/Check.h b/Framework/include/QualityControl/Check.h index 147b45ccd5..c53eede8ba 100644 --- a/Framework/include/QualityControl/Check.h +++ b/Framework/include/QualityControl/Check.h @@ -45,10 +45,6 @@ struct CheckSpec; /// \author Rafal Pacholek class Check { - /// \brief Number of bytes in data description used for hashing of Check descrition names. See HashDataDescription.h for details - static constexpr size_t descriptionHashLength = 4; - static_assert(descriptionHashLength <= o2::header::DataDescription::size); - public: /// Constructor /** @@ -78,21 +74,12 @@ class Check void startOfActivity(const core::Activity& activity); void endOfActivity(const core::Activity& activity); - // TODO: Unique Input string - static o2::header::DataDescription createCheckDataDescription(const std::string& checkName); - - /// \brief creates DataOrigin for Check task in form CDET - /// - /// \param detector Name of the detector to be used. If longer than 3B it will be truncated - static o2::header::DataOrigin createCheckDataOrigin(const std::string& detector); - UpdatePolicyType getUpdatePolicyType() const; std::vector getObjectsNames() const; bool getAllObjectsOption() const; // todo: probably make CheckFactory static CheckConfig extractConfig(const core::CommonSpec&, const CheckSpec&); - static framework::OutputSpec createOutputSpec(const std::string& detector, const std::string& checkName); private: void beautify(std::map>& moMap, const core::Quality& quality); diff --git a/Framework/include/QualityControl/PostProcessingDevice.h b/Framework/include/QualityControl/PostProcessingDevice.h index 8510539ab5..c5c94f8d52 100644 --- a/Framework/include/QualityControl/PostProcessingDevice.h +++ b/Framework/include/QualityControl/PostProcessingDevice.h @@ -36,10 +36,6 @@ struct PostProcessingRunnerConfig; /// \author Piotr Konopka class PostProcessingDevice : public framework::Task { - /// \brief Number of bytes in data description used for hashing DataDescription. See HashDataDescription.h for details - static constexpr size_t descriptionHashLength = 4; - static_assert(descriptionHashLength <= o2::header::DataDescription::size); - public: /// \brief Constructor /// @@ -61,10 +57,6 @@ class PostProcessingDevice : public framework::Task /// \brief Data Processor Label to identify all Task Runners static framework::DataProcessorLabel getLabel() { return { "qc-pp-task-runner" }; } static std::string createPostProcessingDeviceName(const std::string& taskName, const std::string& detectorName); - /// \brief Unified DataOrigin for Post-processing tasks - static header::DataOrigin createPostProcessingDataOrigin(const std::string& detectorCode); - /// \brief Unified DataDescription naming scheme for all Post-processing tasks - static header::DataDescription createPostProcessingDataDescription(const std::string& taskName); private: /// \brief Callback for CallbackService::Id::Start (DPL) a.k.a. RUN transition (FairMQ) diff --git a/Framework/include/QualityControl/TaskRunner.h b/Framework/include/QualityControl/TaskRunner.h index cc1e516e19..472a11788c 100644 --- a/Framework/include/QualityControl/TaskRunner.h +++ b/Framework/include/QualityControl/TaskRunner.h @@ -77,10 +77,6 @@ class ObjectsManager; /// \author Barthelemy von Haller class TaskRunner : public framework::Task { - /// \brief Number of bytes in data description used for hashing of Task names. See HashDataDescription.h for details - static constexpr size_t taskDescriptionHashLength = 4; - static_assert(taskDescriptionHashLength <= o2::header::DataDescription::size); - public: /// \brief Constructor /// @@ -109,10 +105,6 @@ class TaskRunner : public framework::Task static framework::DataProcessorLabel getTaskRunnerLabel() { return { "qc-task" }; } /// \brief ID string for all TaskRunner devices static std::string createTaskRunnerIdString(); - /// \brief Unified DataOrigin for Quality Control tasks - static header::DataOrigin createTaskDataOrigin(const std::string& detectorCode, bool movingWindows = false); - /// \brief Unified DataDescription naming scheme for all tasks - static header::DataDescription createTaskDataDescription(const std::string& taskName); /// \brief Unified DataDescription naming scheme for all timers static header::DataDescription createTimerDataDescription(const std::string& taskName); diff --git a/Framework/include/QualityControl/UserInputOutput.h b/Framework/include/QualityControl/UserInputOutput.h index 6087c5e1ad..eac2231342 100644 --- a/Framework/include/QualityControl/UserInputOutput.h +++ b/Framework/include/QualityControl/UserInputOutput.h @@ -19,6 +19,7 @@ #include +#include #include #include #include @@ -31,16 +32,27 @@ namespace o2::quality_control::core /// \brief returns a standard ConcreteDataMatcher for QC inputs and outputs framework::ConcreteDataMatcher - createUserDataMatcher(DataSourceType dataSourceType, const std::string& detectorName, const std::string& userCodeName); + createUserDataMatcher(DataSourceType dataSourceType, const std::string& detectorName, const std::string& userCodeName, + o2::header::DataHeader::SubSpecificationType subSpec = 0); -/// \brief returns a standard InputSpec for QC inputs and outputs +/// \brief returns a standard InputSpec for QC user data +/// +/// Returns a standard InputSpec for QC user data. The combination of the first four arguments should be unique +/// in a QC workflow. When provided binding is empty, userCodeName is used. If a Data Processor asks for multiple +/// inputs with the same userCodeName, a custom binding should be set. framework::InputSpec - createUserInputSpec(DataSourceType dataSourceType, const std::string& detectorName, const std::string& userCodeName); + createUserInputSpec(DataSourceType dataSourceType, const std::string& detectorName, const std::string& userCodeName, + o2::header::DataHeader::SubSpecificationType subSpec = 0, const std::string& binding = ""); -/// \brief returns a standard OutputSpec for QC inputs and outputs +/// \brief returns a standard OutputSpec for QC user data +/// +/// Returns a standard OutputSpec for QC user data. The combination of the first four arguments should be unique +/// in a QC workflow. When provided binding is empty, userCodeName is used. If a Data Processor asks for multiple +/// outputs with the same userCodeName, a custom binding should be set. framework::OutputSpec - createUserOutputSpec(DataSourceType dataSourceType, const std::string& detectorName, const std::string& userCodeName); + createUserOutputSpec(DataSourceType dataSourceType, const std::string& detectorName, const std::string& userCodeName, + o2::header::DataHeader::SubSpecificationType subSpec = 0, const framework::OutputLabel& binding = {}); } // namespace o2::quality_control::core -#endif // QUALITYCONTROL_USERINPUTOUTPUT_H \ No newline at end of file +#endif // QUALITYCONTROL_USERINPUTOUTPUT_H diff --git a/Framework/src/Aggregator.cxx b/Framework/src/Aggregator.cxx index 5a6934dd60..f5d5e30f62 100644 --- a/Framework/src/Aggregator.cxx +++ b/Framework/src/Aggregator.cxx @@ -26,7 +26,7 @@ #include "QualityControl/Activity.h" #include #include "QualityControl/CommonSpec.h" -#include "QualityControl/DataHeaderHelpers.h" +#include "QualityControl/UserInputOutput.h" #include #include @@ -253,33 +253,11 @@ AggregatorConfig Aggregator::extractConfig(const core::CommonSpec& commonSpec, c std::move(objectNames), checkAllObjects, std::move(inputs), - createOutputSpec(aggregatorSpec.detectorName, aggregatorSpec.aggregatorName), + createUserOutputSpec(DataSourceType::Aggregator, aggregatorSpec.detectorName, aggregatorSpec.aggregatorName), sources }; } -o2::header::DataOrigin createAggregatorDataOrigin(const std::string& detector) -{ - using Origin = o2::header::DataOrigin; - Origin header; - header.runtimeInit(std::string{ "A" }.append(detector.substr(0, Origin::size - 1)).c_str()); - return header; -} - -o2::header::DataDescription createAggregatorDataDescription(const std::string& aggregatorName) -{ - if (aggregatorName.empty()) { - BOOST_THROW_EXCEPTION(FatalException() << AliceO2::Common::errinfo_details("Empty aggregatorName for aggregator's data description")); - } - - return quality_control::core::createDataDescription(aggregatorName, Aggregator::descriptionHashLength); -} - -framework::OutputSpec Aggregator::createOutputSpec(const std::string& detector, const std::string& aggregatorName) -{ - return { createAggregatorDataOrigin(detector), createAggregatorDataDescription(aggregatorName), 0, framework::Lifetime::Sporadic }; -} - void Aggregator::startOfActivity(const core::Activity& activity) { if (mAggregatorInterface) { diff --git a/Framework/src/AggregatorRunner.cxx b/Framework/src/AggregatorRunner.cxx index 09422d4029..1e27cfea01 100644 --- a/Framework/src/AggregatorRunner.cxx +++ b/Framework/src/AggregatorRunner.cxx @@ -96,14 +96,6 @@ void AggregatorRunner::prepareOutputs() } } -header::DataDescription AggregatorRunner::createAggregatorRunnerDataDescription(const std::string& aggregatorName) -{ - if (aggregatorName.empty()) { - BOOST_THROW_EXCEPTION(FatalException() << errinfo_details("Empty taskName for task's data description")); - } - return quality_control::core::createDataDescription(aggregatorName, AggregatorRunner::descriptionHashLength); -} - std::string AggregatorRunner::createAggregatorRunnerName() { return AggregatorRunner::createAggregatorRunnerIdString(); // there is only one thus we can just take the idString diff --git a/Framework/src/Check.cxx b/Framework/src/Check.cxx index d0f151b314..416c7b4a17 100644 --- a/Framework/src/Check.cxx +++ b/Framework/src/Check.cxx @@ -29,7 +29,7 @@ #include "QualityControl/RootClassFactory.h" #include "QualityControl/QcInfoLogger.h" #include "QualityControl/Quality.h" -#include "QualityControl/DataHeaderHelpers.h" +#include "QualityControl/UserInputOutput.h" #include "QualityControl/ObjectMetadataHelpers.h" #include @@ -44,24 +44,6 @@ using namespace std; namespace o2::quality_control::checker { -/// Static functions -o2::header::DataDescription Check::createCheckDataDescription(const std::string& checkName) -{ - if (checkName.empty()) { - BOOST_THROW_EXCEPTION(FatalException() << errinfo_details("Empty checkName for check's data description")); - } - - return quality_control::core::createDataDescription(checkName, Check::descriptionHashLength); -} - -o2::header::DataOrigin Check::createCheckDataOrigin(const std::string& detector) -{ - using Origin = o2::header::DataOrigin; - Origin header; - header.runtimeInit(std::string{ "C" }.append(detector.substr(0, Origin::size - 1)).c_str()); - return header; -} - /// Members Check::Check(CheckConfig config) : mCheckConfig(std::move(config)) @@ -281,15 +263,10 @@ CheckConfig Check::extractConfig(const CommonSpec& commonSpec, const CheckSpec& checkAllObjects, allowBeautify, std::move(inputs), - createOutputSpec(checkSpec.detectorName, checkSpec.checkName), + createUserOutputSpec(DataSourceType::Check, checkSpec.detectorName, checkSpec.checkName), }; } -framework::OutputSpec Check::createOutputSpec(const std::string& detector, const std::string& checkName) -{ - return { createCheckDataOrigin(detector), createCheckDataDescription(checkName), 0, framework::Lifetime::Sporadic }; -} - void Check::startOfActivity(const core::Activity& activity) { if (mCheckInterface) { diff --git a/Framework/src/InfrastructureGenerator.cxx b/Framework/src/InfrastructureGenerator.cxx index 8d421c8f0b..b3ec45ee51 100644 --- a/Framework/src/InfrastructureGenerator.cxx +++ b/Framework/src/InfrastructureGenerator.cxx @@ -337,7 +337,7 @@ framework::WorkflowSpec InfrastructureGenerator::generateLocalBatchInfrastructur auto taskConfig = TaskRunnerFactory::extractConfig(infrastructureSpec.common, taskSpec, 0, 1); workflow.emplace_back(TaskRunnerFactory::create(taskConfig)); - fileSinkInputs.emplace_back(taskSpec.taskName, TaskRunner::createTaskDataOrigin(taskSpec.detectorName), TaskRunner::createTaskDataDescription(taskSpec.taskName), Lifetime::Sporadic); + fileSinkInputs.emplace_back(createUserInputSpec(DataSourceType::Task, taskSpec.detectorName, taskSpec.taskName)); } if (!fileSinkInputs.empty()) { @@ -377,9 +377,8 @@ framework::WorkflowSpec InfrastructureGenerator::generateRemoteBatchInfrastructu // We create an OutputSpec for moving windows for this task only if they are expected. if (!taskConfig.movingWindows.empty()) { fileSourceOutputs.push_back( - { RootFileSource::outputBinding(taskSpec.detectorName, taskSpec.taskName, true), - TaskRunner::createTaskDataOrigin(taskSpec.detectorName, true), - TaskRunner::createTaskDataDescription(taskSpec.taskName), 0, Lifetime::Sporadic }); + createUserOutputSpec(DataSourceType::TaskMovingWindow, taskSpec.detectorName, taskSpec.taskName, 0, + RootFileSource::outputBinding(taskSpec.detectorName, taskSpec.taskName, true))); } } if (!fileSourceOutputs.empty()) { @@ -530,7 +529,7 @@ void InfrastructureGenerator::generateLocalTaskLocalProxy(framework::WorkflowSpe std::string remotePort = std::to_string(taskSpec.remotePort); std::string proxyName = taskSpec.detectorName + "-" + taskName + "-proxy"; std::string channelName = taskSpec.detectorName + "-" + taskName + "-proxy"; - InputSpec proxyInput{ channelName, TaskRunner::createTaskDataOrigin(taskSpec.detectorName, false), TaskRunner::createTaskDataDescription(taskName), static_cast(id), Lifetime::Sporadic }; + InputSpec proxyInput = createUserInputSpec(DataSourceType::Task, taskSpec.detectorName, taskName, static_cast(id), channelName); std::string channelConfig = "name=" + channelName + ",type=pub,method=connect,address=tcp://" + taskSpec.remoteMachine + ":" + remotePort + ",rateLogging=60,transport=zeromq,sndBufSize=4"; @@ -554,7 +553,7 @@ void InfrastructureGenerator::generateLocalTaskRemoteProxy(framework::WorkflowSp Outputs proxyOutputs; for (size_t id = 1; id <= numberOfLocalMachines; id++) { proxyOutputs.emplace_back( - OutputSpec{ { channelName }, TaskRunner::createTaskDataOrigin(taskSpec.detectorName, false), TaskRunner::createTaskDataDescription(taskName), static_cast(id), Lifetime::Sporadic }); + createUserOutputSpec(DataSourceType::Task, taskSpec.detectorName, taskName, static_cast(id), { channelName })); } std::string channelConfig = "name=" + channelName + ",type=sub,method=bind,address=tcp://*:" + remotePort + @@ -586,20 +585,14 @@ void InfrastructureGenerator::generateMergers(framework::WorkflowSpec& workflow, Inputs mergerInputs; for (size_t id = 1; id <= numberOfLocalMachines; id++) { mergerInputs.emplace_back( - InputSpec{ { taskName + std::to_string(id) }, - TaskRunner::createTaskDataOrigin(detectorName, false), - TaskRunner::createTaskDataDescription(taskName), - static_cast(id), - Lifetime::Sporadic }); + createUserInputSpec(DataSourceType::Task, detectorName, taskName, static_cast(id), taskName + std::to_string(id))); } MergerInfrastructureBuilder mergersBuilder; mergersBuilder.setInfrastructureName(taskName); mergersBuilder.setInputSpecs(mergerInputs); - mergersBuilder.setOutputSpec( - { { "main" }, TaskRunner::createTaskDataOrigin(detectorName, false), TaskRunner::createTaskDataDescription(taskName), 0, Lifetime::Sporadic }); - mergersBuilder.setOutputSpecMovingWindow( - { { "main_mw" }, TaskRunner::createTaskDataOrigin(detectorName, true), TaskRunner::createTaskDataDescription(taskName), 0, Lifetime::Sporadic }); + mergersBuilder.setOutputSpec(createUserOutputSpec(DataSourceType::Task, detectorName, taskName, 0, { "main" })); + mergersBuilder.setOutputSpecMovingWindow(createUserOutputSpec(DataSourceType::TaskMovingWindow, detectorName, taskName, 0, { "main_mw" })); MergerConfig mergerConfig; // if we are to change the mode to Full, disable reseting tasks after each cycle. mergerConfig.inputObjectTimespan = { (mergingMode.empty() || mergingMode == "delta") ? InputObjectsTimespan::LastDifference : InputObjectsTimespan::FullHistory }; @@ -630,23 +623,20 @@ void InfrastructureGenerator::generateCheckRunners(framework::WorkflowSpec& work // todo: avoid code repetition for (const auto& taskSpec : infrastructureSpec.tasks | std::views::filter(&TaskSpec::active)) { - InputSpec taskOutput{ taskSpec.taskName, TaskRunner::createTaskDataOrigin(taskSpec.detectorName), TaskRunner::createTaskDataDescription(taskSpec.taskName), Lifetime::Sporadic }; + InputSpec taskOutput{ createUserInputSpec(DataSourceType::Task, taskSpec.detectorName, taskSpec.taskName) }; tasksOutputMap.insert({ DataSpecUtils::label(taskOutput), taskOutput }); bool movingWindowsEnabled = !taskSpec.movingWindows.empty(); bool synchronousRemote = taskSpec.location == TaskLocationSpec::Local && (infrastructureSpec.workflowType == WorkflowType::Remote || infrastructureSpec.workflowType == WorkflowType::FullChain); bool asynchronousRemote = infrastructureSpec.workflowType == WorkflowType::RemoteBatch; if (movingWindowsEnabled && (synchronousRemote || asynchronousRemote)) { - InputSpec taskMovingWindowOutput{ taskSpec.taskName, TaskRunner::createTaskDataOrigin(taskSpec.detectorName, true), TaskRunner::createTaskDataDescription(taskSpec.taskName), Lifetime::Sporadic }; + InputSpec taskMovingWindowOutput{ createUserInputSpec(DataSourceType::TaskMovingWindow, taskSpec.detectorName, taskSpec.taskName) }; tasksOutputMap.insert({ DataSpecUtils::label(taskMovingWindowOutput), taskMovingWindowOutput }); } } for (const auto& ppTaskSpec : infrastructureSpec.postProcessingTasks | std::views::filter(&PostProcessingTaskSpec::active)) { - InputSpec ppTaskOutput{ ppTaskSpec.taskName, - PostProcessingDevice::createPostProcessingDataOrigin(ppTaskSpec.detectorName), - PostProcessingDevice::createPostProcessingDataDescription(ppTaskSpec.taskName), - Lifetime::Sporadic }; + InputSpec ppTaskOutput{ createUserInputSpec(DataSourceType::PostProcessingTask, ppTaskSpec.detectorName, ppTaskSpec.taskName) }; tasksOutputMap.insert({ DataSpecUtils::label(ppTaskOutput), ppTaskOutput }); } @@ -779,27 +769,18 @@ void InfrastructureGenerator::generatePostProcessing(WorkflowSpec& workflow, con } } -template -auto createSinkInput(const std::string& detectorName, const std::string& name) -> framework::InputSpec -{ - const auto outputSpec = Type::createOutputSpec(detectorName, name); - auto input = DataSpecUtils::matchingInput(outputSpec); - input.binding = name; - return input; -} - void InfrastructureGenerator::generateBookkeepingQualitySink(WorkflowSpec& workflow, const InfrastructureSpec& infrastructureSpec) { framework::Inputs sinkInputs{}; for (const auto& checkSpec : infrastructureSpec.checks | std::views::filter(&CheckSpec::active) | std::views::filter(&CheckSpec::exportToBookkeeping)) { ILOG(Debug, Support) << "Adding input to BookkeepingSink from check " << checkSpec.checkName << " and detector: " << checkSpec.detectorName << ENDM; - sinkInputs.emplace_back(createSinkInput(checkSpec.detectorName, checkSpec.checkName)); + sinkInputs.emplace_back(createUserInputSpec(DataSourceType::Check, checkSpec.detectorName, checkSpec.checkName)); } for (const auto& aggregatorSpec : infrastructureSpec.aggregators | std::views::filter(&AggregatorSpec::active) | std::views::filter(&AggregatorSpec::exportToBookkeeping)) { ILOG(Debug, Support) << "Adding input to BookkeepingSink from aggregator " << aggregatorSpec.aggregatorName << " and detector: " << aggregatorSpec.detectorName << ENDM; - sinkInputs.emplace_back(createSinkInput(aggregatorSpec.detectorName, aggregatorSpec.aggregatorName)); + sinkInputs.emplace_back(createUserInputSpec(DataSourceType::Aggregator, aggregatorSpec.detectorName, aggregatorSpec.aggregatorName)); } if (sinkInputs.empty()) { diff --git a/Framework/src/InfrastructureSpecReader.cxx b/Framework/src/InfrastructureSpecReader.cxx index a904b3570a..91f18cd71c 100644 --- a/Framework/src/InfrastructureSpecReader.cxx +++ b/Framework/src/InfrastructureSpecReader.cxx @@ -15,10 +15,7 @@ #include "QualityControl/InfrastructureSpecReader.h" #include "QualityControl/QcInfoLogger.h" -#include "QualityControl/TaskRunner.h" -#include "QualityControl/PostProcessingDevice.h" -#include "QualityControl/Check.h" -#include "QualityControl/AggregatorRunner.h" +#include "QualityControl/UserInputOutput.h" #include #include @@ -227,7 +224,7 @@ DataSourceSpec InfrastructureSpecReader::readSpecEntry(const std dss.name = wholeTree.get("qc.tasks." + dss.id + ".taskName", dss.id); auto detectorName = wholeTree.get("qc.tasks." + dss.id + ".detectorName"); - dss.inputs = { { dss.name, TaskRunner::createTaskDataOrigin(detectorName), TaskRunner::createTaskDataDescription(dss.name), 0, Lifetime::Sporadic } }; + dss.inputs = { createUserInputSpec(DataSourceType::Task, detectorName, dss.name) }; if (dataSourceTree.count("MOs") > 0) { for (const auto& moName : dataSourceTree.get_child("MOs")) { const auto mo = moName.second.get_value(); @@ -247,7 +244,7 @@ DataSourceSpec InfrastructureSpecReader::readSpecEntry(const std dss.name = taskName + "/mw"; auto detectorName = wholeTree.get("qc.tasks." + dss.id + ".detectorName"); - dss.inputs = { { dss.name, TaskRunner::createTaskDataOrigin(detectorName, true), TaskRunner::createTaskDataDescription(taskName), 0, Lifetime::Sporadic } }; + dss.inputs = { createUserInputSpec(DataSourceType::TaskMovingWindow, detectorName, taskName, 0, dss.name) }; if (dataSourceTree.count("MOs") > 0) { for (const auto& moName : dataSourceTree.get_child("MOs")) { const auto mo = moName.second.get_value(); @@ -265,7 +262,7 @@ DataSourceSpec InfrastructureSpecReader::readSpecEntry(const std // this allows us to have tasks with the same name for different detectors dss.name = wholeTree.get("qc.postprocessing." + dss.id + ".taskName", dss.id); auto detectorName = wholeTree.get("qc.postprocessing." + dss.id + ".detectorName"); - dss.inputs = { { dss.name, PostProcessingDevice::createPostProcessingDataOrigin(detectorName), PostProcessingDevice::createPostProcessingDataDescription(dss.id), 0, Lifetime::Sporadic } }; + dss.inputs = { createUserInputSpec(DataSourceType::PostProcessingTask, detectorName, dss.id, 0, dss.name) }; if (dataSourceTree.count("MOs") > 0) { for (const auto& moName : dataSourceTree.get_child("MOs")) { dss.subInputs.push_back(moName.second.get_value()); @@ -277,7 +274,7 @@ DataSourceSpec InfrastructureSpecReader::readSpecEntry(const std dss.id = dataSourceTree.get("name"); dss.name = wholeTree.get("qc.checks." + dss.id + ".checkName", dss.id); auto detectorName = wholeTree.get("qc.checks." + dss.id + ".detectorName"); - dss.inputs = { { dss.name, Check::createCheckDataOrigin(detectorName), Check::createCheckDataDescription(dss.name), 0, Lifetime::Sporadic } }; + dss.inputs = { createUserInputSpec(DataSourceType::Check, detectorName, dss.name) }; if (dataSourceTree.count("QOs") > 0) { for (const auto& moName : dataSourceTree.get_child("QOs")) { const auto qo = moName.second.get_value(); @@ -294,7 +291,7 @@ DataSourceSpec InfrastructureSpecReader::readSpecEntry(const std dss.id = dataSourceTree.get("name"); dss.name = wholeTree.get("qc.aggregators." + dss.id + ".checkName", dss.id); auto detectorName = wholeTree.get("qc.aggregators." + dss.id + ".detectorName"); - dss.inputs = { { dss.name, Check::createCheckDataOrigin(detectorName), AggregatorRunner::createAggregatorRunnerDataDescription(dss.name), 0, Lifetime::Sporadic } }; + dss.inputs = { createUserInputSpec(DataSourceType::Aggregator, detectorName, dss.name) }; if (dataSourceTree.count("QOs") > 0) { for (const auto& moName : dataSourceTree.get_child("QOs")) { const auto qo = moName.second.get_value(); diff --git a/Framework/src/PostProcessingDevice.cxx b/Framework/src/PostProcessingDevice.cxx index 4b004357d5..b4b17e905a 100644 --- a/Framework/src/PostProcessingDevice.cxx +++ b/Framework/src/PostProcessingDevice.cxx @@ -22,6 +22,7 @@ #include "QualityControl/PostProcessingRunnerConfig.h" #include "QualityControl/QcInfoLogger.h" #include "QualityControl/DataHeaderHelpers.h" +#include "QualityControl/UserInputOutput.h" #include "QualityControl/runnerUtils.h" #include @@ -33,8 +34,6 @@ using namespace AliceO2::Common; using namespace o2::framework; -constexpr auto outputBinding = "mo"; - namespace o2::quality_control::postprocessing { @@ -66,7 +65,7 @@ void PostProcessingDevice::run(framework::ProcessingContext& ctx) { // we set the publication callback each time, because we cannot be sure that // the reference to DataAllocator does not change - mRunner->setPublicationCallback(publishToDPL(ctx.outputs(), outputBinding)); + mRunner->setPublicationCallback(publishToDPL(ctx.outputs(), mRunnerConfig.taskName)); // When run returns false, it has done its processing. if (!mRunner->run()) { @@ -80,34 +79,6 @@ std::string PostProcessingDevice::createPostProcessingDeviceName(const std::stri return "qc-pp-" + detectorName + "-" + taskName; } -header::DataOrigin PostProcessingDevice::createPostProcessingDataOrigin(const std::string& detectorCode) -{ - // We need a unique Data Origin, so we can have PP Tasks with the same names for different detectors. - // However, to avoid colliding with data marked as e.g. TPC/CLUSTERS, we add 'P' to the data origin, so it is P. - std::string originStr = "P"; - if (detectorCode.empty()) { - ILOG(Warning, Support) << "empty detector code for a task data origin, trying to survive with: DET" << ENDM; - originStr += "DET"; - } else if (detectorCode.size() > 3) { - ILOG(Warning, Support) << "too long detector code for a task data origin: " + detectorCode + ", trying to survive with: " + detectorCode.substr(0, 3) << ENDM; - originStr += detectorCode.substr(0, 3); - } else { - originStr += detectorCode; - } - o2::header::DataOrigin origin; - origin.runtimeInit(originStr.c_str()); - return origin; -} - -header::DataDescription PostProcessingDevice::createPostProcessingDataDescription(const std::string& taskName) -{ - if (taskName.empty()) { - BOOST_THROW_EXCEPTION(FatalException() << errinfo_details("Empty taskName for pp-task's data description")); - } - - return quality_control::core::createDataDescription(taskName, PostProcessingDevice::descriptionHashLength); -} - void PostProcessingDevice::start(ServiceRegistryRef services) { mRunner->start(services); @@ -134,7 +105,7 @@ framework::Inputs PostProcessingDevice::getInputsSpecs() timerDescription.runtimeInit(std::string("T-" + mRunner->getID()).substr(0, o2::header::DataDescription::size).c_str()); return { { "timer-pp-" + mRunner->getID(), - createPostProcessingDataOrigin(mRunnerConfig.detectorName), + createDataOrigin(core::DataSourceType::PostProcessingTask, mRunnerConfig.detectorName), timerDescription, 0, Lifetime::Timer } }; @@ -142,7 +113,7 @@ framework::Inputs PostProcessingDevice::getInputsSpecs() framework::Outputs PostProcessingDevice::getOutputSpecs() const { - return { { { outputBinding }, createPostProcessingDataOrigin(mRunnerConfig.detectorName), createPostProcessingDataDescription(mRunnerConfig.taskName), 0, Lifetime::Sporadic } }; + return { createUserOutputSpec(core::DataSourceType::PostProcessingTask, mRunnerConfig.detectorName, mRunnerConfig.taskName) }; } framework::Options PostProcessingDevice::getOptions() diff --git a/Framework/src/TaskRunner.cxx b/Framework/src/TaskRunner.cxx index 61e19f95b2..82baae8a3d 100644 --- a/Framework/src/TaskRunner.cxx +++ b/Framework/src/TaskRunner.cxx @@ -49,7 +49,6 @@ #include "QualityControl/TimekeeperFactory.h" #include "QualityControl/ActivityHelpers.h" #include "QualityControl/WorkflowType.h" -#include "QualityControl/DataHeaderHelpers.h" #include "QualityControl/runnerUtils.h" #include @@ -238,34 +237,6 @@ std::string TaskRunner::createTaskRunnerIdString() return { "qc-task" }; } -header::DataOrigin TaskRunner::createTaskDataOrigin(const std::string& detectorCode, bool movingWindows) -{ - // We need a unique Data Origin, so we can have QC Tasks with the same names for different detectors. - // However, to avoid colliding with data marked as e.g. TPC/CLUSTERS, we add 'Q' to the data origin, so it is Q. - std::string originStr = movingWindows ? "W" : "Q"; - if (detectorCode.empty()) { - ILOG(Warning, Support) << "empty detector code for a task data origin, trying to survive with: DET" << ENDM; - originStr += "DET"; - } else if (detectorCode.size() > 3) { - ILOG(Warning, Support) << "too long detector code for a task data origin: " + detectorCode + ", trying to survive with: " + detectorCode.substr(0, 3) << ENDM; - originStr += detectorCode.substr(0, 3); - } else { - originStr += detectorCode; - } - o2::header::DataOrigin origin; - origin.runtimeInit(originStr.c_str()); - return origin; -} - -header::DataDescription TaskRunner::createTaskDataDescription(const std::string& taskName) -{ - if (taskName.empty()) { - BOOST_THROW_EXCEPTION(FatalException() << errinfo_details("Empty taskName for task's data description")); - } - - return quality_control::core::createDataDescription(taskName, TaskRunner::taskDescriptionHashLength); -} - header::DataDescription TaskRunner::createTimerDataDescription(const std::string& taskName) { if (taskName.empty()) { diff --git a/Framework/src/TaskRunnerFactory.cxx b/Framework/src/TaskRunnerFactory.cxx index b08a6d5cf2..91ff6a3930 100644 --- a/Framework/src/TaskRunnerFactory.cxx +++ b/Framework/src/TaskRunnerFactory.cxx @@ -22,6 +22,8 @@ #include "QualityControl/InfrastructureSpecReader.h" #include "QualityControl/TimekeeperFactory.h" #include "QualityControl/QcInfoLogger.h" +#include "QualityControl/DataHeaderHelpers.h" +#include "QualityControl/UserInputOutput.h" #include #include @@ -141,11 +143,11 @@ TaskRunnerConfig TaskRunnerFactory::extractConfig(const CommonSpec& globalConfig inputs.insert(inputs.begin(), globalTrackingDataRequest->inputs.begin(), globalTrackingDataRequest->inputs.end()); } - OutputSpec monitorObjectsSpec{ { "mo" }, - TaskRunner::createTaskDataOrigin(taskSpec.detectorName), - TaskRunner::createTaskDataDescription(taskSpec.taskName), - static_cast(parallelTaskID), - Lifetime::Sporadic }; + OutputSpec monitorObjectsSpec = createUserOutputSpec( + DataSourceType::Task, + taskSpec.detectorName, + taskSpec.taskName, + static_cast(parallelTaskID)); Options options{ { "period-timer-cycle", framework::VariantType::Int, static_cast(taskSpec.cycleDurationSeconds * 1000000), { "timer period" } }, @@ -209,7 +211,7 @@ InputSpec TaskRunnerFactory::createTimerInputSpec(const CommonSpec& globalConfig } return { "timer-cycle", - TaskRunner::createTaskDataOrigin(detectorName), + createDataOrigin(DataSourceType::Task, detectorName), TaskRunner::createTimerDataDescription(taskName), 0, Lifetime::Timer, diff --git a/Framework/src/UserInputOutput.cxx b/Framework/src/UserInputOutput.cxx index b9d05b2fc2..e7859602eb 100644 --- a/Framework/src/UserInputOutput.cxx +++ b/Framework/src/UserInputOutput.cxx @@ -20,37 +20,38 @@ namespace o2::quality_control::core { framework::ConcreteDataMatcher - createUserDataMatcher(DataSourceType dataSourceType, const std::string& detectorName, const std::string& userCodeName) + createUserDataMatcher(DataSourceType dataSourceType, const std::string& detectorName, const std::string& userCodeName, + o2::header::DataHeader::SubSpecificationType subSpec) { return { createDataOrigin(dataSourceType, detectorName), createDataDescription(userCodeName, dataSourceType), - 0 + subSpec }; } framework::InputSpec - createUserInputSpec(DataSourceType dataSourceType, const std::string& detectorName, const std::string& userCodeName) + createUserInputSpec(DataSourceType dataSourceType, const std::string& detectorName, const std::string& userCodeName, + o2::header::DataHeader::SubSpecificationType subSpec, const std::string& binding) { // currently all of our outputs are Lifetime::Sporadic, so we don't allow for customization, but it could be factored out. - // we assume using `userCodeName` as a binding in all cases return { - userCodeName, - createUserDataMatcher(dataSourceType, detectorName, userCodeName), + binding.empty() ? userCodeName : binding, + createUserDataMatcher(dataSourceType, detectorName, userCodeName, subSpec), framework::Lifetime::Sporadic }; } framework::OutputSpec - createUserOutputSpec(DataSourceType dataSourceType, const std::string& detectorName, const std::string& userCodeName) + createUserOutputSpec(DataSourceType dataSourceType, const std::string& detectorName, const std::string& userCodeName, + o2::header::DataHeader::SubSpecificationType subSpec, const framework::OutputLabel& binding) { // currently all of our outputs are Lifetime::Sporadic, so we don't allow for customization, but it could be factored out. - // we assume using `userCodeName` as a binding in all cases return { - framework::OutputLabel{ userCodeName }, - createUserDataMatcher(dataSourceType, detectorName, userCodeName), + binding.value.empty() ? framework::OutputLabel{ userCodeName } : binding, + createUserDataMatcher(dataSourceType, detectorName, userCodeName, subSpec), framework::Lifetime::Sporadic }; } -} // namespace o2::quality_control::core \ No newline at end of file +} // namespace o2::quality_control::core diff --git a/Framework/src/runBasic.cxx b/Framework/src/runBasic.cxx index 2510834ac4..c194d4c9fe 100644 --- a/Framework/src/runBasic.cxx +++ b/Framework/src/runBasic.cxx @@ -36,6 +36,7 @@ #include #include "QualityControl/InfrastructureGenerator.h" +#include "QualityControl/UserInputOutput.h" #include "Common/Exceptions.h" using namespace o2; @@ -121,7 +122,8 @@ WorkflowSpec defineDataProcessing(const ConfigContext& config) DataProcessorSpec printer{ .name = "printer", .inputs = Inputs{ - { "checked-mo", "CTST", Check::createCheckDataDescription(getFirstCheckName(qcConfigurationSource)), 0, Lifetime::Sporadic } }, + o2::quality_control::core::createUserInputSpec(o2::quality_control::core::DataSourceType::Check, "TST", + getFirstCheckName(qcConfigurationSource), 0, "checked-mo") }, .algorithm = adaptFromTask(), .labels = { { "resilient" } } }; @@ -130,7 +132,8 @@ WorkflowSpec defineDataProcessing(const ConfigContext& config) DataProcessorSpec printer{ .name = "printer", .inputs = Inputs{ - { "checked-mo", "CTST", TaskRunner::createTaskDataDescription(getFirstTaskName(qcConfigurationSource)), 0, Lifetime::Sporadic } }, + o2::quality_control::core::createUserInputSpec(o2::quality_control::core::DataSourceType::Task, "TST", + getFirstTaskName(qcConfigurationSource), 0, "checked-mo") }, .algorithm = adaptFromTask(), .labels = { { "resilient" } } diff --git a/Framework/test/testAggregatorRunner.cxx b/Framework/test/testAggregatorRunner.cxx index ef170a6d2d..91a5a62948 100644 --- a/Framework/test/testAggregatorRunner.cxx +++ b/Framework/test/testAggregatorRunner.cxx @@ -32,7 +32,6 @@ using namespace o2::quality_control::checker; using namespace std; using namespace o2::framework; using namespace o2::configuration; -using namespace o2::header; using namespace o2::quality_control::core; std::pair> getAggregatorConfigs(const std::string& configFilePath) @@ -50,13 +49,6 @@ std::pair> getAggregatorCo return { aggregatorRunnerConfig, aggregatorConfigs }; } -TEST_CASE("test_aggregator_runner_static") -{ - CHECK((AggregatorRunner::createAggregatorRunnerDataDescription("qwertyuiop") == DataDescription("qwertyuiop"))); - CHECK((AggregatorRunner::createAggregatorRunnerDataDescription("012345678901234567890") == DataDescription("012345678901639b"))); - CHECK_THROWS_AS(AggregatorRunner::createAggregatorRunnerDataDescription(""), AliceO2::Common::FatalException); -} - TEST_CASE("test_aggregator_runner") { std::string configFilePath = std::string("json://") + getTestDataDirectory() + "testSharedConfig.json"; diff --git a/Framework/test/testCheckWorkflow.cxx b/Framework/test/testCheckWorkflow.cxx index 8a769cbc27..2bb84037b4 100644 --- a/Framework/test/testCheckWorkflow.cxx +++ b/Framework/test/testCheckWorkflow.cxx @@ -18,10 +18,12 @@ #include #include #include "QualityControl/InfrastructureGenerator.h" +#include "QualityControl/UserInputOutput.h" using namespace o2; using namespace o2::framework; using namespace o2::utilities; +using namespace o2::quality_control::core; const std::string receiverName = "Receiver"; @@ -110,7 +112,7 @@ class Receiver : public o2::framework::Task { Inputs inputs; for (auto& checkName : mNames) { - inputs.push_back({ checkName, "CTST", Check::createCheckDataDescription(checkName), Lifetime::Sporadic }); + inputs.push_back({ checkName, createUserDataMatcher(DataSourceType::Check, "TST", checkName), Lifetime::Sporadic }); } return inputs; } diff --git a/Framework/test/testTaskRunner.cxx b/Framework/test/testTaskRunner.cxx index bea47c5923..7c8d8d5e56 100644 --- a/Framework/test/testTaskRunner.cxx +++ b/Framework/test/testTaskRunner.cxx @@ -17,6 +17,7 @@ #include "getTestDataDirectory.h" #include "QualityControl/TaskRunnerFactory.h" #include "QualityControl/TaskRunner.h" +#include "QualityControl/UserInputOutput.h" #include #include #include "QualityControl/InfrastructureSpecReader.h" @@ -69,7 +70,7 @@ BOOST_AUTO_TEST_CASE(test_factory) BOOST_CHECK(taskRunner.inputs[1].lifetime == Lifetime::Timer); BOOST_REQUIRE_EQUAL(taskRunner.outputs.size(), 1); - BOOST_CHECK_EQUAL(taskRunner.outputs[0], (OutputSpec{ { "mo" }, "QXXX", "abcTask", 123, Lifetime::Sporadic })); + BOOST_CHECK_EQUAL(taskRunner.outputs[0], createUserOutputSpec(DataSourceType::Task, "XXX", "abcTask", 123)); BOOST_CHECK(taskRunner.algorithm.onInit != nullptr); @@ -79,10 +80,6 @@ BOOST_AUTO_TEST_CASE(test_factory) BOOST_AUTO_TEST_CASE(test_task_runner_static) { - BOOST_CHECK_EQUAL(TaskRunner::createTaskDataOrigin("DET"), DataOrigin("QDET")); - BOOST_CHECK(TaskRunner::createTaskDataDescription("qwertyuiop") == DataDescription("qwertyuiop")); - BOOST_CHECK(TaskRunner::createTaskDataDescription("012345678901234567890") == DataDescription("012345678901639b")); - BOOST_CHECK_THROW(TaskRunner::createTaskDataDescription(""), AliceO2::Common::FatalException); BOOST_CHECK_EQUAL(TaskRunner::createTaskRunnerIdString(), "qc-task"); } @@ -98,7 +95,7 @@ BOOST_AUTO_TEST_CASE(test_task_runner) BOOST_CHECK_EQUAL(qcTask.getInputsSpecs()[0], DataSampling::InputSpecsForPolicy(dataSamplingTree, "tpcclust").at(0)); BOOST_CHECK(qcTask.getInputsSpecs()[1].lifetime == Lifetime::Timer); - BOOST_CHECK_EQUAL(qcTask.getOutputSpec(), (OutputSpec{ { "mo" }, "QXXX", "abcTask", 0, Lifetime::Sporadic })); + BOOST_CHECK_EQUAL(qcTask.getOutputSpec(), createUserOutputSpec(DataSourceType::Task, "XXX", "abcTask", 0)); BOOST_REQUIRE_EQUAL(qcTask.getOptions().size(), 3); BOOST_CHECK_EQUAL(qcTask.getOptions()[0].name, "period-timer-cycle"); diff --git a/Framework/test/testUserInputOutput.cxx b/Framework/test/testUserInputOutput.cxx index d8c3673a57..73f0568c60 100644 --- a/Framework/test/testUserInputOutput.cxx +++ b/Framework/test/testUserInputOutput.cxx @@ -28,26 +28,38 @@ namespace o2::quality_control::core TEST_CASE("ConcreteDataMatcher") { - auto dataMatcher = createUserDataMatcher(DataSourceType::Task, "TST", "mytask"); + auto dataMatcher = createUserDataMatcher(DataSourceType::Task, "TST", "mytask", 7); CHECK(dataMatcher.origin == DataOrigin{ "QTST" }); CHECK(dataMatcher.description == DataDescription{ "mytask" }); - CHECK(dataMatcher.subSpec == 0); + CHECK(dataMatcher.subSpec == 7); } TEST_CASE("InputSpec") { - auto inputSpec = createUserInputSpec(DataSourceType::Task, "TST", "mytask"); - CHECK(inputSpec.binding == "mytask"); - CHECK(inputSpec.lifetime == framework::Lifetime::Sporadic); - CHECK(framework::DataSpecUtils::match(inputSpec, framework::ConcreteDataMatcher{ DataOrigin{ "QTST" }, DataDescription{ "mytask" }, 0 })); + { + auto inputSpec = createUserInputSpec(DataSourceType::Task, "TST", "mytask", 3); + CHECK(inputSpec.binding == "mytask"); + CHECK(inputSpec.lifetime == framework::Lifetime::Sporadic); + CHECK(framework::DataSpecUtils::match(inputSpec, framework::ConcreteDataMatcher{ DataOrigin{ "QTST" }, DataDescription{ "mytask" }, 3 })); + } + { + auto inputSpec = createUserInputSpec(DataSourceType::Task, "TST", "mytask", 3, "custom_binding"); + CHECK(inputSpec.binding == "custom_binding"); + } } TEST_CASE("OutputSpec") { - auto outputSpec = createUserOutputSpec(DataSourceType::Task, "TST", "mytask"); - CHECK(outputSpec.binding.value == "mytask"); - CHECK(outputSpec.lifetime == framework::Lifetime::Sporadic); - CHECK(framework::DataSpecUtils::match(outputSpec, framework::ConcreteDataMatcher{ DataOrigin{ "QTST" }, DataDescription{ "mytask" }, 0 })); + { + auto outputSpec = createUserOutputSpec(DataSourceType::Task, "TST", "mytask", 5); + CHECK(outputSpec.binding.value == "mytask"); + CHECK(outputSpec.lifetime == framework::Lifetime::Sporadic); + CHECK(framework::DataSpecUtils::match(outputSpec, framework::ConcreteDataMatcher{ DataOrigin{ "QTST" }, DataDescription{ "mytask" }, 5 })); + } + { + auto outputSpec = createUserOutputSpec(DataSourceType::Task, "TST", "mytask", 5, { "custom_binding" }); + CHECK(outputSpec.binding.value == "custom_binding"); + } } } // namespace o2::quality_control::core diff --git a/Framework/test/testWorkflow.cxx b/Framework/test/testWorkflow.cxx index 4f45f4b338..3bf361b9d4 100644 --- a/Framework/test/testWorkflow.cxx +++ b/Framework/test/testWorkflow.cxx @@ -16,10 +16,12 @@ #include #include "QualityControl/InfrastructureGenerator.h" +#include "QualityControl/UserInputOutput.h" using namespace o2; using namespace o2::framework; using namespace o2::utilities; +using namespace o2::quality_control::core; void customize(std::vector& policies) { @@ -70,17 +72,19 @@ WorkflowSpec defineDataProcessing(ConfigContext const&) // Generation of the QC topology (one task, one checker in this case) quality_control::generateStandaloneInfrastructure(specs, configInterface->getRecursive()); + const auto checkName = getFirstCheckName(qcConfigurationSource); + // Finally the receiver DataProcessorSpec receiver{ "receiver", Inputs{ - { "checked-mo", "CTST", Check::createCheckDataDescription(getFirstCheckName(qcConfigurationSource)), 0, Lifetime::Sporadic } }, + createUserInputSpec(DataSourceType::Check, "TST", checkName) }, Outputs{}, AlgorithmSpec{ - [](ProcessingContext& pctx) { + [checkName](ProcessingContext& pctx) { // If any message reaches this point, the QC workflow should work at least on a basic level. - auto qo = pctx.inputs().get("checked-mo"); + auto qo = pctx.inputs().get(checkName); if (!qo) { ILOG(Error, Devel) << "Quality Object is a NULL" << ENDM; pctx.services().get().readyToQuit(QuitRequest::All);