@@ -212,9 +212,8 @@ DataProcessingDevice::DataProcessingDevice(RunningDeviceRef running, ServiceRegi
212212 });
213213}
214214
215- // Callback to execute the processing. Notice how the data is
216- // is a vector of DataProcessorContext so that we can index the correct
217- // one with the thread id. For the moment we simply use the first one.
215+ // Callback to execute the processing. Receives and relays data (doPrepare)
216+ // happens on the main thread before this is queued, so we only dispatch here.
218217void run_callback (uv_work_t * handle)
219218{
220219 auto * task = (TaskStreamInfo*)handle->data ;
@@ -223,7 +222,6 @@ void run_callback(uv_work_t* handle)
223222 auto & dataProcessorContext = ref.get <DataProcessorContext>();
224223 O2_SIGNPOST_ID_FROM_POINTER (sid, device, &dataProcessorContext);
225224 O2_SIGNPOST_START (device, sid, " run_callback" , " Starting run callback on stream %d" , task->id .index );
226- DataProcessingDevice::doPrepare (ref);
227225 DataProcessingDevice::doRun (ref);
228226 O2_SIGNPOST_END (device, sid, " run_callback" , " Done processing data for stream %d" , task->id .index );
229227}
@@ -1333,6 +1331,10 @@ void DataProcessingDevice::Run()
13331331 handleRegionCallbacks (mServiceRegistry , mPendingRegionInfos );
13341332 }
13351333
1334+ // Receive and relay incoming data on the main thread so that I/O
1335+ // overlaps with computation running concurrently on work threads.
1336+ DataProcessingDevice::doPrepare (ref);
1337+
13361338 assert (mStreams .size () == mHandles .size ());
13371339 // / Decide which task to use
13381340 TaskStreamRef streamRef{-1 };
0 commit comments