@@ -1854,11 +1854,56 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo&
18541854 VariableContextHelpers::getTimeslice(variables);
18551855 forwardInputs(ref, slot, dropped, oldestOutputInfo, false, true);
18561856 };
1857+
1858+ auto onInsertion = [](ServiceRegistryRef& ref, std::span<fair::mq::MessagePtr>& messages) {
1859+ O2_SIGNPOST_ID_GENERATE(sid, forwarding);
1860+
1861+ auto& spec = ref.get<DeviceSpec const>();
1862+ bool hasForwards = spec.forwards.empty() == false;
1863+ auto& context = ref.get<DataProcessorContext>();
1864+ if (context.canForwardEarly && hasForwards) {
1865+ O2_SIGNPOST_EVENT_EMIT(device, sid, "device", "Early forwardinding before injecting data into relayer.");
1866+ auto& timesliceIndex = ref.get<TimesliceIndex>();
1867+ auto oldestTimeslice = timesliceIndex.getOldestPossibleOutput();
1868+
1869+ auto& proxy = ref.get<FairMQDeviceProxy>();
1870+
1871+ O2_SIGNPOST_ID_GENERATE(sid, forwarding);
1872+ O2_SIGNPOST_START(forwarding, sid, "forwardInputs", "Starting forwarding for incoming messages with oldestTimeslice %zu with copy",
1873+ oldestTimeslice.timeslice.value);
1874+ std::vector<fair::mq::Parts> forwardedParts;
1875+ forwardedParts.resize(proxy.getNumForwards());
1876+ DataProcessingHelpers::routeForwardedMessages(proxy, messages, forwardedParts, true, false);
1877+
1878+ for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
1879+ if (forwardedParts[fi].Size() == 0) {
1880+ continue;
1881+ }
1882+ ForwardChannelInfo info = proxy.getForwardChannelInfo(ChannelIndex{fi});
1883+ auto& parts = forwardedParts[fi];
1884+ if (info.policy == nullptr) {
1885+ O2_SIGNPOST_EVENT_EMIT_ERROR(forwarding, sid, "forwardInputs", "Forwarding to %{public}s %d has no policy.", info.name.c_str(), fi);
1886+ continue;
1887+ }
1888+ O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding to %{public}s %d", info.name.c_str(), fi);
1889+ info.policy->forward(parts, ChannelIndex{fi}, ref);
1890+ }
1891+ auto& asyncQueue = ref.get<AsyncQueue>();
1892+ auto& decongestion = ref.get<DecongestionService>();
1893+ O2_SIGNPOST_ID_GENERATE(aid, async_queue);
1894+ O2_SIGNPOST_EVENT_EMIT(async_queue, aid, "forwardInputs", "Queuing forwarding oldestPossible %zu", oldestTimeslice.timeslice.value);
1895+ AsyncQueueHelpers::post(asyncQueue, AsyncTask{.timeslice = oldestTimeslice.timeslice, .id = decongestion.oldestPossibleTimesliceTask, .debounce = -1, .callback = decongestionCallbackLate}
1896+ .user<DecongestionContext>({.ref = ref, .oldestTimeslice = oldestTimeslice}));
1897+ O2_SIGNPOST_END(forwarding, sid, "forwardInputs", "Forwarding done");
1898+ }
1899+ };
1900+
18571901 auto relayed = relayer.relay(parts.At(headerIndex)->GetData(),
18581902 &parts.At(headerIndex),
18591903 input,
18601904 nMessages,
18611905 nPayloadsPerHeader,
1906+ onInsertion,
18621907 onDrop);
18631908 switch (relayed.type) {
18641909 case DataRelayer::RelayChoice::Type::Backpressured:
@@ -2273,9 +2318,13 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
22732318 bool consumeSomething = action.op == CompletionPolicy::CompletionOp::Consume || action.op == CompletionPolicy::CompletionOp::ConsumeExisting;
22742319
22752320 if (context.canForwardEarly && hasForwards && consumeSomething) {
2276- O2_SIGNPOST_EVENT_EMIT(device, aid, "device", "Early forwainding: %{public}s.", fmt::format("{}", action.op).c_str());
2277- auto& timesliceIndex = ref.get<TimesliceIndex>();
2278- forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), true, action.op == CompletionPolicy::CompletionOp::Consume);
2321+ // We used to do fowarding here, however we now do it much earlier.
2322+ // We still need to clean the inputs which were already consumed
2323+ // via ConsumeExisting and which still have an header to hold the slot.
2324+ // FIXME: do we? This should really happen when we do the forwarding on
2325+ // insertion, because otherwise we lose the relevant information on how to
2326+ // navigate the set of headers. We could actually rely on the messageset index,
2327+ // is that the right thing to do though?
22792328 }
22802329 markInputsAsDone(action.slot);
22812330
0 commit comments