From e11218107e0f115d02fb8e4a68421691bb4481d2 Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Sun, 26 Apr 2026 16:03:34 +0200 Subject: [PATCH] DPL: make sure data preparation remains on the main thread This way we can process incoming data while doing computation separately, without having to worry about thready safety of the DataRelayer itself. --- Framework/Core/src/DataProcessingDevice.cxx | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index b062f2bf68a75..36545e281bd55 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -212,9 +212,8 @@ DataProcessingDevice::DataProcessingDevice(RunningDeviceRef running, ServiceRegi }); } -// Callback to execute the processing. Notice how the data is -// is a vector of DataProcessorContext so that we can index the correct -// one with the thread id. For the moment we simply use the first one. +// Callback to execute the processing. Receives and relays data (doPrepare) +// happens on the main thread before this is queued, so we only dispatch here. void run_callback(uv_work_t* handle) { auto* task = (TaskStreamInfo*)handle->data; @@ -223,7 +222,6 @@ void run_callback(uv_work_t* handle) auto& dataProcessorContext = ref.get(); O2_SIGNPOST_ID_FROM_POINTER(sid, device, &dataProcessorContext); O2_SIGNPOST_START(device, sid, "run_callback", "Starting run callback on stream %d", task->id.index); - DataProcessingDevice::doPrepare(ref); DataProcessingDevice::doRun(ref); O2_SIGNPOST_END(device, sid, "run_callback", "Done processing data for stream %d", task->id.index); } @@ -1333,6 +1331,10 @@ void DataProcessingDevice::Run() handleRegionCallbacks(mServiceRegistry, mPendingRegionInfos); } + // Receive and relay incoming data on the main thread so that I/O + // overlaps with computation running concurrently on work threads. + DataProcessingDevice::doPrepare(ref); + assert(mStreams.size() == mHandles.size()); /// Decide which task to use TaskStreamRef streamRef{-1};