diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index b062f2bf68a75..36545e281bd55 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -212,9 +212,8 @@ DataProcessingDevice::DataProcessingDevice(RunningDeviceRef running, ServiceRegi }); } -// Callback to execute the processing. Notice how the data is -// is a vector of DataProcessorContext so that we can index the correct -// one with the thread id. For the moment we simply use the first one. +// Callback to execute the processing. Receives and relays data (doPrepare) +// happens on the main thread before this is queued, so we only dispatch here. void run_callback(uv_work_t* handle) { auto* task = (TaskStreamInfo*)handle->data; @@ -223,7 +222,6 @@ void run_callback(uv_work_t* handle) auto& dataProcessorContext = ref.get(); O2_SIGNPOST_ID_FROM_POINTER(sid, device, &dataProcessorContext); O2_SIGNPOST_START(device, sid, "run_callback", "Starting run callback on stream %d", task->id.index); - DataProcessingDevice::doPrepare(ref); DataProcessingDevice::doRun(ref); O2_SIGNPOST_END(device, sid, "run_callback", "Done processing data for stream %d", task->id.index); } @@ -1333,6 +1331,10 @@ void DataProcessingDevice::Run() handleRegionCallbacks(mServiceRegistry, mPendingRegionInfos); } + // Receive and relay incoming data on the main thread so that I/O + // overlaps with computation running concurrently on work threads. + DataProcessingDevice::doPrepare(ref); + assert(mStreams.size() == mHandles.size()); /// Decide which task to use TaskStreamRef streamRef{-1};