Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions Framework/Core/src/DataProcessingDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,8 @@ DataProcessingDevice::DataProcessingDevice(RunningDeviceRef running, ServiceRegi
});
}

// Callback to execute the processing. Notice how the data is
// is a vector of DataProcessorContext so that we can index the correct
// one with the thread id. For the moment we simply use the first one.
// Callback to execute the processing. Receives and relays data (doPrepare)
// happens on the main thread before this is queued, so we only dispatch here.
void run_callback(uv_work_t* handle)
{
auto* task = (TaskStreamInfo*)handle->data;
Expand All @@ -223,7 +222,6 @@ void run_callback(uv_work_t* handle)
auto& dataProcessorContext = ref.get<DataProcessorContext>();
O2_SIGNPOST_ID_FROM_POINTER(sid, device, &dataProcessorContext);
O2_SIGNPOST_START(device, sid, "run_callback", "Starting run callback on stream %d", task->id.index);
DataProcessingDevice::doPrepare(ref);
DataProcessingDevice::doRun(ref);
O2_SIGNPOST_END(device, sid, "run_callback", "Done processing data for stream %d", task->id.index);
}
Expand Down Expand Up @@ -1333,6 +1331,10 @@ void DataProcessingDevice::Run()
handleRegionCallbacks(mServiceRegistry, mPendingRegionInfos);
}

// Receive and relay incoming data on the main thread so that I/O
// overlaps with computation running concurrently on work threads.
DataProcessingDevice::doPrepare(ref);

assert(mStreams.size() == mHandles.size());
/// Decide which task to use
TaskStreamRef streamRef{-1};
Expand Down