Skip to content

Commit 8fb56a2

Browse files
committed
DPL: move completed to be a stream variable
1 parent 280dac7 commit 8fb56a2

3 files changed

Lines changed: 12 additions & 7 deletions

File tree

Framework/Core/include/Framework/DataProcessingContext.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ struct DataProcessorContext {
4444

4545
// FIXME: move stuff here from the list below... ;-)
4646
ServiceRegistry* registry = nullptr;
47-
std::vector<DataRelayer::RecordAction> completed;
4847
std::vector<ExpirationHandler> expirationHandlers;
4948
AlgorithmSpec::InitCallback init;
5049
AlgorithmSpec::ProcessCallback statefulProcess;

Framework/Core/include/Framework/StreamContext.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#ifndef O2_FRAMEWORK_STREAMCONTEXT_H_
1212
#define O2_FRAMEWORK_STREAMCONTEXT_H_
1313

14+
#include "Framework/DataRelayer.h"
1415
#include "Framework/ServiceHandle.h"
1516
#include "ProcessingContext.h"
1617
#include "ServiceSpec.h"
@@ -64,6 +65,10 @@ struct StreamContext {
6465
// the callback will be called for all of them.
6566
std::vector<ServiceStartStreamHandle> preStartStreamHandles;
6667

68+
/// Per-stream list of actions ready to be dispatched. Populated by
69+
/// getReadyToProcess() and consumed by tryDispatchComputation().
70+
std::vector<DataRelayer::RecordAction> completed;
71+
6772
// Information on wether or not all the required routes have been created.
6873
// This is used to check if the LifetimeTimeframe routes were all created
6974
// for a given iteration.

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1644,6 +1644,7 @@ void DataProcessingDevice::doPrepare(ServiceRegistryRef ref)
16441644
void DataProcessingDevice::doRun(ServiceRegistryRef ref)
16451645
{
16461646
auto& context = ref.get<DataProcessorContext>();
1647+
auto& streamContext = ref.get<StreamContext>();
16471648
O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context);
16481649
auto& state = ref.get<DeviceState>();
16491650
auto& spec = ref.get<DeviceSpec const>();
@@ -1652,9 +1653,9 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref)
16521653
return;
16531654
}
16541655

1655-
context.completed.clear();
1656-
context.completed.reserve(16);
1657-
if (DataProcessingDevice::tryDispatchComputation(ref, context.completed)) {
1656+
streamContext.completed.clear();
1657+
streamContext.completed.reserve(16);
1658+
if (DataProcessingDevice::tryDispatchComputation(ref, streamContext.completed)) {
16581659
state.lastActiveDataProcessor.store(&context);
16591660
}
16601661
DanglingContext danglingContext{*context.registry};
@@ -1668,8 +1669,8 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref)
16681669
state.lastActiveDataProcessor = &context;
16691670
}
16701671

1671-
context.completed.clear();
1672-
if (DataProcessingDevice::tryDispatchComputation(ref, context.completed)) {
1672+
streamContext.completed.clear();
1673+
if (DataProcessingDevice::tryDispatchComputation(ref, streamContext.completed)) {
16731674
state.lastActiveDataProcessor = &context;
16741675
}
16751676

@@ -1695,7 +1696,7 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref)
16951696

16961697
bool shouldProcess = DataProcessingHelpers::hasOnlyGenerated(spec) == false;
16971698

1698-
while (DataProcessingDevice::tryDispatchComputation(ref, context.completed) && shouldProcess) {
1699+
while (DataProcessingDevice::tryDispatchComputation(ref, streamContext.completed) && shouldProcess) {
16991700
relayer.processDanglingInputs(context.expirationHandlers, *context.registry, false);
17001701
}
17011702

0 commit comments

Comments
 (0)