From 727cb2e93a89417d9247cc04bcd301d474368737 Mon Sep 17 00:00:00 2001 From: fjtirado Date: Mon, 29 Jun 2026 17:48:05 +0200 Subject: [PATCH] Execute grpcCalls on Application executor service Signed-off-by: fjtirado --- .../impl/WorkflowError.java | 6 +- .../grpc/CollectionStreamObserver.java | 34 +-- .../executors/grpc/FileDescriptorReader.java | 85 ++++-- .../impl/executors/grpc/GrpcExecutor.java | 257 ++++++++++-------- .../executors/grpc/GrpcExecutorBuilder.java | 41 +-- ...stContext.java => ItemStreamObserver.java} | 18 +- .../executors/grpc/ModelStreamObserver.java | 54 ++++ .../executors/grpc/ProtobufMessageUtils.java | 62 +---- 8 files changed, 317 insertions(+), 240 deletions(-) rename impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/{GrpcRequestContext.java => ItemStreamObserver.java} (55%) create mode 100644 impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/ModelStreamObserver.java diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowError.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowError.java index ee2c4a6c5..4afd744ac 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowError.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowError.java @@ -35,7 +35,7 @@ public static Builder expression() { return error("https://serverlessworkflow.io/spec/1.0.0/errors/expression", 400); } - public static Builder communication(int status, TaskContext context, Exception ex) { + public static Builder communication(int status, TaskContext context, Throwable ex) { return communication(status, context, ex.getMessage()); } @@ -54,13 +54,13 @@ public static Builder communication(TaskContext context, String title) { return communication(Errors.COMMUNICATION.status(), context, title); } - public static Builder runtime(int status, TaskContext context, Exception ex) { + public static Builder runtime(int status, TaskContext context, Throwable ex) { return new Builder(Errors.RUNTIME.toString(), status) .instance(context.position().jsonPointer()) .title(ex.getMessage()); } - public static Builder runtime(TaskContext context, Exception ex) { + public static Builder runtime(TaskContext context, Throwable ex) { return runtime(Errors.RUNTIME.status(), context, ex); } diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/CollectionStreamObserver.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/CollectionStreamObserver.java index cd1c51ef7..1b0198410 100644 --- a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/CollectionStreamObserver.java +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/CollectionStreamObserver.java @@ -16,39 +16,19 @@ package io.serverlessworkflow.impl.executors.grpc; import com.google.protobuf.Message; -import io.grpc.stub.StreamObserver; -import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; import io.serverlessworkflow.impl.WorkflowModelCollection; -import io.serverlessworkflow.impl.WorkflowModelFactory; -import java.util.concurrent.CompletableFuture; -class CollectionStreamObserver implements StreamObserver { - private final WorkflowModelCollection modelCollection; - private final WorkflowModelFactory modelFactory; - private final CompletableFuture future; +class CollectionStreamObserver extends ModelStreamObserver { - public CollectionStreamObserver(WorkflowModelFactory modelFactory) { - this.modelCollection = modelFactory.createCollection(); - this.modelFactory = modelFactory; - this.future = new CompletableFuture<>(); + public CollectionStreamObserver(WorkflowContext workflowContext, TaskContext taskContext) { + super(workflowContext, taskContext); + this.model = modelFactory.createCollection(); } @Override public void onNext(Message value) { - modelCollection.add(ProtobufMessageUtils.convert(value, modelFactory)); - } - - @Override - public void onError(Throwable t) { - future.completeExceptionally(t); - } - - @Override - public void onCompleted() { - future.complete(modelCollection); - } - - public CompletableFuture future() { - return future; + model.add(ProtobufMessageUtils.convert(value, modelFactory)); } } diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorReader.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorReader.java index f4eb62352..f6ceb6b88 100644 --- a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorReader.java +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorReader.java @@ -18,6 +18,10 @@ import com.github.os72.protocjar.Protoc; import com.google.protobuf.DescriptorProtos; import com.google.protobuf.DescriptorProtos.FileDescriptorProto; +import com.google.protobuf.DescriptorProtos.FileDescriptorSet; +import com.google.protobuf.Descriptors.DescriptorValidationException; +import com.google.protobuf.Descriptors.FileDescriptor; +import com.google.protobuf.ProtocolStringList; import io.serverlessworkflow.impl.resources.ExternalResourceHandler; import java.io.FileNotFoundException; import java.io.IOException; @@ -28,6 +32,12 @@ import java.nio.file.Path; import java.nio.file.SimpleFileVisitor; import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,8 +45,7 @@ class FileDescriptorReader { private static final Logger logger = LoggerFactory.getLogger(FileDescriptorReader.class); - static FileDescriptorProto readDescriptor(ExternalResourceHandler externalResourceHandler) { - + static FileDescriptor readDescriptor(ExternalResourceHandler externalResourceHandler) { Path grpcDir = null; try (InputStream inputStream = externalResourceHandler.open()) { grpcDir = Files.createTempDirectory("serverless-workflow-"); @@ -46,21 +55,14 @@ static FileDescriptorProto readDescriptor(ExternalResourceHandler externalResour Files.copy(inputStream, protoFile); Path descriptorOutput = grpcDir.resolve("descriptor.protobin"); generateFileDescriptor(grpcDir, protoFile, descriptorOutput); - return DescriptorProtos.FileDescriptorSet.newBuilder() - .mergeFrom(Files.readAllBytes(descriptorOutput)) - .build() - .getFileList() - .stream() - .filter(n -> n.getName().equals(name)) - .findAny() - .orElseThrow( - () -> - new IllegalStateException( - "There was something really wrong generating the protoc descriptor, it does not included a file named " - + name)); + return toFileDescriptor( + DescriptorProtos.FileDescriptorSet.newBuilder() + .mergeFrom(Files.readAllBytes(descriptorOutput)) + .build(), + name); } catch (IOException e) { throw new UncheckedIOException( - "Unable to process gRPC proto file associated with resource " + "Unable to process gRPC proto file associated with resource: " + externalResourceHandler.name(), e); } finally { @@ -70,6 +72,54 @@ static FileDescriptorProto readDescriptor(ExternalResourceHandler externalResour } } + private static FileDescriptor toFileDescriptor(FileDescriptorSet set, String name) { + List remainingProtos = new ArrayList<>(set.getFileList()); + Map builtDescriptors = new HashMap<>(); + while (!remainingProtos.isEmpty()) { + Iterator iterator = remainingProtos.iterator(); + boolean modified = false; + while (iterator.hasNext()) { + FileDescriptorProto proto = iterator.next(); + ProtocolStringList requiredDependencies = proto.getDependencyList(); + List currentDependencies = + requiredDependencies.stream() + .map(builtDescriptors::get) + .filter(Objects::nonNull) + .toList(); + if (requiredDependencies.size() == currentDependencies.size()) { + FileDescriptor fileDescriptor = buildFileDescriptor(proto, currentDependencies); + String protoName = proto.getName(); + if (protoName.equals(name)) { + return fileDescriptor; + } + builtDescriptors.put(protoName, fileDescriptor); + iterator.remove(); + modified = true; + } + } + if (!modified) { + throw new IllegalStateException( + "Unable to build valid gRPC descriptor from proto file associated with resource: " + + name + + ". There are missing or circular dependencies"); + } + } + throw new IllegalStateException("Cannot build FileDescriptor for name: " + name); + } + + private static FileDescriptor buildFileDescriptor( + FileDescriptorProto proto, List currentDependencies) { + try { + return FileDescriptor.buildFrom( + proto, currentDependencies.toArray(new FileDescriptor[currentDependencies.size()])); + } catch (DescriptorValidationException e) { + throw new IllegalStateException( + "Unable to build valid gRPC descriptor from proto file associated with resource: " + + proto.getName(), + e); + } + } + private static void deleteTempFiles(Path grpcDir) { try { Files.walkFileTree( @@ -142,15 +192,10 @@ private static void generateFileDescriptorWithProcessBuilder(String[] protocArgs String[] command = new String[protocArgs.length + 1]; command[0] = "protoc"; System.arraycopy(protocArgs, 0, command, 1, protocArgs.length); - ProcessBuilder processBuilder = new ProcessBuilder(command); - processBuilder.redirectErrorStream(true); - Process process = processBuilder.start(); - int exitCode = process.waitFor(); - if (exitCode != 0) { throw new IOException( "Unable to generate file descriptor using system protoc. Exit code: " + exitCode); diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutor.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutor.java index 930e1bc1c..de59c23e5 100644 --- a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutor.java +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutor.java @@ -15,10 +15,9 @@ */ package io.serverlessworkflow.impl.executors.grpc; -import com.google.protobuf.DescriptorProtos.FileDescriptorProto; +import com.google.protobuf.DescriptorProtos; import com.google.protobuf.Descriptors; import com.google.protobuf.DynamicMessage; -import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Message; import io.grpc.CallOptions; import io.grpc.Channel; @@ -26,36 +25,74 @@ import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.MethodDescriptor; +import io.grpc.MethodDescriptor.MethodType; import io.grpc.protobuf.ProtoUtils; import io.grpc.stub.ClientCalls; import io.grpc.stub.StreamObserver; import io.serverlessworkflow.impl.TaskContext; import io.serverlessworkflow.impl.WorkflowContext; -import io.serverlessworkflow.impl.WorkflowError; -import io.serverlessworkflow.impl.WorkflowException; import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowModelCollection; import io.serverlessworkflow.impl.WorkflowValueResolver; import io.serverlessworkflow.impl.executors.CallableTask; +import java.io.IOException; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.function.UnaryOperator; public class GrpcExecutor implements CallableTask { public static final String GRPC_CHANNEL_PROVIDER = "grpcChannelProvider"; - private final GrpcRequestContext requestContext; private final WorkflowValueResolver> arguments; - private final FileDescriptorProto fileDescriptorProto; + private final Descriptors.MethodDescriptor methodDescriptor; + private final String address; + private final int port; + private final GRPCOperation operation; + private final MethodDescriptor callDescriptor; + + @FunctionalInterface + private interface GRPCOperation { + CompletableFuture apply( + WorkflowContext workflowContext, + TaskContext taskContext, + Map parameters, + Descriptors.MethodDescriptor methodDescriptor, + ClientCall call); + } public GrpcExecutor( - GrpcRequestContext requestContext, + String address, + int port, WorkflowValueResolver> arguments, - FileDescriptorProto fileDescriptorProto) { - this.requestContext = requestContext; + Descriptors.ServiceDescriptor serviceDescriptor, + Descriptors.MethodDescriptor methodDescriptor) { + this.address = address; + this.port = port; this.arguments = arguments; - this.fileDescriptorProto = fileDescriptorProto; + this.methodDescriptor = methodDescriptor; + MethodType methodType = getMethodType(methodDescriptor); + this.operation = + switch (methodType) { + case CLIENT_STREAMING -> GrpcExecutor::handleClientStreaming; + case BIDI_STREAMING -> GrpcExecutor::handleBidiStreaming; + case SERVER_STREAMING -> GrpcExecutor::handleServerStreaming; + case UNARY, UNKNOWN -> GrpcExecutor::handleAsyncUnary; + }; + this.callDescriptor = + MethodDescriptor.newBuilder() + .setType(methodType) + .setFullMethodName( + MethodDescriptor.generateFullMethodName( + serviceDescriptor.getFullName(), methodDescriptor.getName())) + .setRequestMarshaller( + ProtoUtils.marshaller( + DynamicMessage.newBuilder(methodDescriptor.getInputType()).buildPartial())) + .setResponseMarshaller( + ProtoUtils.marshaller( + DynamicMessage.newBuilder(methodDescriptor.getOutputType()).buildPartial())) + .build(); } @Override @@ -67,146 +104,132 @@ public CompletableFuture apply( private CompletableFuture buildGrpcCallExecutor( WorkflowContext workflowContext, TaskContext taskContext, Map arguments) { - + Optional providedChannel = + workflowContext + .definition() + .application() + .additionalObject(GRPC_CHANNEL_PROVIDER, workflowContext, taskContext); + Channel channel = + providedChannel.orElseGet( + () -> ManagedChannelBuilder.forAddress(address, port).usePlaintext().build()); + ClientCall call = + channel.newCall( + callDescriptor, + CallOptions.DEFAULT + .withWaitForReady() + .withExecutor(workflowContext.definition().application().executorService())); + CompletableFuture result = null; try { - Descriptors.FileDescriptor fileDescriptor = - Descriptors.FileDescriptor.buildFrom( - fileDescriptorProto, new Descriptors.FileDescriptor[] {}); - - Descriptors.ServiceDescriptor serviceDescriptor = - Objects.requireNonNull( - fileDescriptor.findServiceByName(requestContext.service()), - "Service not found: " + requestContext.service()); - - Descriptors.MethodDescriptor methodDescriptor = - Objects.requireNonNull( - serviceDescriptor.findMethodByName(requestContext.method()), - "Method not found: " + requestContext.method()); - - MethodDescriptor.MethodType methodType = ProtobufMessageUtils.getMethodType(methodDescriptor); - - Optional providedChannel = - workflowContext - .definition() - .application() - .additionalObject(GRPC_CHANNEL_PROVIDER, workflowContext, taskContext); - Channel channel = - providedChannel.orElseGet( - () -> - ManagedChannelBuilder.forAddress(requestContext.address(), requestContext.port()) - .usePlaintext() - .build()); - ClientCall call = - buildClientCall(channel, methodType, serviceDescriptor, methodDescriptor); - CompletableFuture result = - switch (methodType) { - case CLIENT_STREAMING -> - handleClientStreaming(workflowContext, arguments, methodDescriptor, call); - case BIDI_STREAMING -> - handleBidiStreaming(workflowContext, arguments, methodDescriptor, call); - case SERVER_STREAMING -> - handleServerStreaming(workflowContext, methodDescriptor, arguments, call); - case UNARY, UNKNOWN -> - handleAsyncUnary(workflowContext, methodDescriptor, arguments, call); - }; - return providedChannel.isEmpty() && channel instanceof ManagedChannel managedChannel - ? result.whenComplete((__, ___) -> managedChannel.shutdown()) - : result; - } catch (Descriptors.DescriptorValidationException | InvalidProtocolBufferException e) { - throw new WorkflowException(WorkflowError.runtime(taskContext, e).build()); + result = operation.apply(workflowContext, taskContext, arguments, methodDescriptor, call); + } finally { + if (providedChannel.isEmpty() && channel instanceof ManagedChannel managedChannel) { + if (result == null) { + managedChannel.shutdown(); + } else { + result = result.whenComplete((__, ___) -> managedChannel.shutdown()); + } + } } - } - - private static ClientCall buildClientCall( - Channel channel, - MethodDescriptor.MethodType methodType, - Descriptors.ServiceDescriptor serviceDescriptor, - Descriptors.MethodDescriptor methodDescriptor) { - return channel.newCall( - MethodDescriptor.newBuilder() - .setType(methodType) - .setFullMethodName( - MethodDescriptor.generateFullMethodName( - serviceDescriptor.getFullName(), methodDescriptor.getName())) - .setRequestMarshaller( - ProtoUtils.marshaller( - DynamicMessage.newBuilder(methodDescriptor.getInputType()).buildPartial())) - .setResponseMarshaller( - ProtoUtils.marshaller( - DynamicMessage.newBuilder(methodDescriptor.getOutputType()).buildPartial())) - .build(), - CallOptions.DEFAULT.withWaitForReady()); + return result; } private static CompletableFuture handleClientStreaming( WorkflowContext workflowContext, + TaskContext taskContext, Map parameters, Descriptors.MethodDescriptor methodDescriptor, ClientCall call) { - return ProtobufMessageUtils.asyncStreamingCall( + return asyncStreamingCall( parameters, methodDescriptor, responseObserver -> ClientCalls.asyncClientStreamingCall(call, responseObserver), - workflowContext.definition().application().modelFactory()); + workflowContext, + taskContext); } private static CompletableFuture handleBidiStreaming( WorkflowContext workflowContext, + TaskContext taskContext, Map parameters, Descriptors.MethodDescriptor methodDescriptor, ClientCall call) { - - return ProtobufMessageUtils.asyncStreamingCall( + return asyncStreamingCall( parameters, methodDescriptor, responseObserver -> ClientCalls.asyncBidiStreamingCall(call, responseObserver), - workflowContext.definition().application().modelFactory()); + workflowContext, + taskContext); } private static CompletableFuture handleServerStreaming( WorkflowContext workflowContext, - Descriptors.MethodDescriptor methodDescriptor, + TaskContext taskContext, Map parameters, - ClientCall call) - throws InvalidProtocolBufferException { - CollectionStreamObserver observer = - new CollectionStreamObserver(workflowContext.definition().application().modelFactory()); - ClientCalls.asyncServerStreamingCall( - call, ProtobufMessageUtils.buildMessage(methodDescriptor, parameters).build(), observer); + Descriptors.MethodDescriptor methodDescriptor, + ClientCall call) { + ModelStreamObserver observer = + new CollectionStreamObserver(workflowContext, taskContext); + try { + ClientCalls.asyncServerStreamingCall( + call, ProtobufMessageUtils.buildMessage(methodDescriptor, parameters).build(), observer); + } catch (IOException io) { + observer.onError(io); + } return observer.future(); } private static CompletableFuture handleAsyncUnary( WorkflowContext workflowContext, - Descriptors.MethodDescriptor methodDescriptor, + TaskContext taskContext, Map parameters, - ClientCall call) - throws InvalidProtocolBufferException { - - CompletableFuture future = new CompletableFuture<>(); - ClientCalls.asyncUnaryCall( - call, - ProtobufMessageUtils.buildMessage(methodDescriptor, parameters).build(), - new StreamObserver<>() { - private WorkflowModel model; - - @Override - public void onNext(Message value) { - model = - ProtobufMessageUtils.convert( - value, workflowContext.definition().application().modelFactory()); - } + Descriptors.MethodDescriptor methodDescriptor, + ClientCall call) { + ModelStreamObserver observer = + new ItemStreamObserver(workflowContext, taskContext); + try { + ClientCalls.asyncUnaryCall( + call, ProtobufMessageUtils.buildMessage(methodDescriptor, parameters).build(), observer); + } catch (IOException io) { + observer.onError(io); + } + return observer.future(); + } - @Override - public void onError(Throwable t) { - future.completeExceptionally(t); - } + private static CompletableFuture asyncStreamingCall( + Map parameters, + com.google.protobuf.Descriptors.MethodDescriptor methodDescriptor, + UnaryOperator> streamObserverFunction, + WorkflowContext workflowContext, + TaskContext taskContext) { + ModelStreamObserver responseObserver = + new CollectionStreamObserver(workflowContext, taskContext); + StreamObserver requestObserver = streamObserverFunction.apply(responseObserver); + try { + for (Object entry : parameters.entrySet()) { + requestObserver.onNext( + ProtobufMessageUtils.buildMessage( + entry, DynamicMessage.newBuilder(methodDescriptor.getInputType())) + .build()); + } + requestObserver.onCompleted(); + } catch (IOException e) { + requestObserver.onError(e); + } + return responseObserver.future(); + } - @Override - public void onCompleted() { - future.complete(model); - } - }); - return future; + private static MethodDescriptor.MethodType getMethodType( + com.google.protobuf.Descriptors.MethodDescriptor methodDesc) { + DescriptorProtos.MethodDescriptorProto methodDescProto = methodDesc.toProto(); + if (methodDescProto.getClientStreaming()) { + if (methodDescProto.getServerStreaming()) { + return MethodDescriptor.MethodType.BIDI_STREAMING; + } + return MethodDescriptor.MethodType.CLIENT_STREAMING; + } else if (methodDescProto.getServerStreaming()) { + return MethodDescriptor.MethodType.SERVER_STREAMING; + } else { + return MethodDescriptor.MethodType.UNARY; + } } } diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutorBuilder.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutorBuilder.java index 72e1917ec..4c22b4a61 100644 --- a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutorBuilder.java +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutorBuilder.java @@ -15,6 +15,8 @@ */ package io.serverlessworkflow.impl.executors.grpc; +import com.google.protobuf.Descriptors; +import com.google.protobuf.Descriptors.FileDescriptor; import io.serverlessworkflow.api.types.CallGRPC; import io.serverlessworkflow.api.types.GRPCArguments; import io.serverlessworkflow.api.types.TaskBase; @@ -22,10 +24,10 @@ import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowMutablePosition; import io.serverlessworkflow.impl.WorkflowUtils; -import io.serverlessworkflow.impl.WorkflowValueResolver; import io.serverlessworkflow.impl.executors.CallableTaskBuilder; import io.serverlessworkflow.impl.executors.CallableTaskFactory; import java.util.Map; +import java.util.Objects; public class GrpcExecutorBuilder implements CallableTaskBuilder { @@ -37,25 +39,30 @@ public boolean accept(Class clazz) { @Override public CallableTaskFactory init( CallGRPC task, WorkflowDefinition definition, WorkflowMutablePosition position) { - GRPCArguments with = task.getWith(); WithGRPCService service = with.getService(); - - WorkflowValueResolver> arguments = - WorkflowUtils.buildMapResolver( - definition.application(), - with.getArguments() != null ? with.getArguments().getAdditionalProperties() : Map.of()); - - GrpcRequestContext grpcRequestContext = - new GrpcRequestContext( - service.getHost(), service.getPort(), with.getMethod(), service.getName()); - + FileDescriptor fileDescriptor = + definition + .resourceLoader() + .loadStatic(with.getProto().getEndpoint(), FileDescriptorReader::readDescriptor); + Descriptors.ServiceDescriptor serviceDescriptor = + Objects.requireNonNull( + fileDescriptor.findServiceByName(service.getName()), + "Service not found: " + service.getName()); + Descriptors.MethodDescriptor methodDescriptor = + Objects.requireNonNull( + serviceDescriptor.findMethodByName(with.getMethod()), + "Method not found: " + with.getMethod()); return () -> new GrpcExecutor( - grpcRequestContext, - arguments, - definition - .resourceLoader() - .loadStatic(with.getProto().getEndpoint(), FileDescriptorReader::readDescriptor)); + service.getHost(), + service.getPort(), + WorkflowUtils.buildMapResolver( + definition.application(), + with.getArguments() != null + ? with.getArguments().getAdditionalProperties() + : Map.of()), + serviceDescriptor, + methodDescriptor); } } diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcRequestContext.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/ItemStreamObserver.java similarity index 55% rename from impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcRequestContext.java rename to impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/ItemStreamObserver.java index cbdb57168..ddbecf830 100644 --- a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcRequestContext.java +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/ItemStreamObserver.java @@ -15,4 +15,20 @@ */ package io.serverlessworkflow.impl.executors.grpc; -record GrpcRequestContext(String address, int port, String method, String service) {} +import com.google.protobuf.Message; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowModel; + +class ItemStreamObserver extends ModelStreamObserver { + + public ItemStreamObserver(WorkflowContext workflowContext, TaskContext taskContext) { + super(workflowContext, taskContext); + model = modelFactory.fromNull(); + } + + @Override + public void onNext(Message value) { + model = ProtobufMessageUtils.convert(value, modelFactory); + } +} diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/ModelStreamObserver.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/ModelStreamObserver.java new file mode 100644 index 000000000..c89c39ef7 --- /dev/null +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/ModelStreamObserver.java @@ -0,0 +1,54 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.grpc; + +import com.google.protobuf.Message; +import io.grpc.stub.StreamObserver; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowError; +import io.serverlessworkflow.impl.WorkflowException; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowModelFactory; +import java.util.concurrent.CompletableFuture; + +abstract class ModelStreamObserver implements StreamObserver { + protected T model; + protected final WorkflowModelFactory modelFactory; + private final CompletableFuture future; + private final TaskContext taskContext; + + public ModelStreamObserver(WorkflowContext workflowContext, TaskContext taskContext) { + this.modelFactory = workflowContext.definition().application().modelFactory(); + this.taskContext = taskContext; + this.future = new CompletableFuture<>(); + } + + @Override + public void onError(Throwable t) { + future.completeExceptionally( + new WorkflowException(WorkflowError.runtime(taskContext, t).build(), t)); + } + + @Override + public void onCompleted() { + future.complete(model); + } + + public CompletableFuture future() { + return future; + } +} diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/ProtobufMessageUtils.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/ProtobufMessageUtils.java index 0c2eec488..c4484f56b 100644 --- a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/ProtobufMessageUtils.java +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/ProtobufMessageUtils.java @@ -15,23 +15,16 @@ */ package io.serverlessworkflow.impl.executors.grpc; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.google.protobuf.DescriptorProtos; import com.google.protobuf.Descriptors; import com.google.protobuf.DynamicMessage; -import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Message; import com.google.protobuf.util.JsonFormat; -import io.grpc.MethodDescriptor; -import io.grpc.stub.StreamObserver; import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.WorkflowModelFactory; import io.serverlessworkflow.impl.jackson.JsonUtils; import java.io.IOException; import java.io.UncheckedIOException; import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.function.UnaryOperator; class ProtobufMessageUtils { @@ -45,58 +38,17 @@ static WorkflowModel convert(Message message, WorkflowModelFactory modelFactory) } } - static MethodDescriptor.MethodType getMethodType( - com.google.protobuf.Descriptors.MethodDescriptor methodDesc) { - DescriptorProtos.MethodDescriptorProto methodDescProto = methodDesc.toProto(); - if (methodDescProto.getClientStreaming()) { - if (methodDescProto.getServerStreaming()) { - return MethodDescriptor.MethodType.BIDI_STREAMING; - } - return MethodDescriptor.MethodType.CLIENT_STREAMING; - } else if (methodDescProto.getServerStreaming()) { - return MethodDescriptor.MethodType.SERVER_STREAMING; - } else { - return MethodDescriptor.MethodType.UNARY; - } - } - - static CompletableFuture asyncStreamingCall( - Map parameters, - com.google.protobuf.Descriptors.MethodDescriptor methodDescriptor, - UnaryOperator> streamObserverFunction, - WorkflowModelFactory modelFactory) { - CollectionStreamObserver responseObserver = new CollectionStreamObserver(modelFactory); - StreamObserver requestObserver = streamObserverFunction.apply(responseObserver); - for (Object entry : parameters.entrySet()) { - try { - requestObserver.onNext( - buildMessage(entry, DynamicMessage.newBuilder(methodDescriptor.getInputType())) - .build()); - } catch (InvalidProtocolBufferException e) { - requestObserver.onError(e); - } - } - requestObserver.onCompleted(); - return responseObserver.future(); - } - - static Message.Builder buildMessage(Object object, Message.Builder builder) - throws InvalidProtocolBufferException { - try { - // let's use Jackson to serialize the object to string for now, although we probably need to - // revisit this. - JsonFormat.parser().merge(JsonUtils.mapper().writeValueAsString(object), builder); - return builder; - } catch (JsonProcessingException e) { - throw new InvalidProtocolBufferException(e); - } + static Message.Builder buildMessage(Object object, Message.Builder builder) throws IOException { + // let's use Jackson to serialize the object to string for now, although we probably need to + // revisit this. + JsonFormat.parser().merge(JsonUtils.mapper().writeValueAsString(object), builder); + return builder; } static Message.Builder buildMessage( Descriptors.MethodDescriptor methodDescriptor, Map parameters) - throws InvalidProtocolBufferException { - DynamicMessage.Builder builder = DynamicMessage.newBuilder(methodDescriptor.getInputType()); - return buildMessage(parameters, builder); + throws IOException { + return buildMessage(parameters, DynamicMessage.newBuilder(methodDescriptor.getInputType())); } private ProtobufMessageUtils() {}