Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public static Builder expression() {
return error("https://serverlessworkflow.io/spec/1.0.0/errors/expression", 400);
}

public static Builder communication(int status, TaskContext context, Exception ex) {
public static Builder communication(int status, TaskContext context, Throwable ex) {
return communication(status, context, ex.getMessage());
}

Expand All @@ -54,13 +54,13 @@ public static Builder communication(TaskContext context, String title) {
return communication(Errors.COMMUNICATION.status(), context, title);
}

public static Builder runtime(int status, TaskContext context, Exception ex) {
public static Builder runtime(int status, TaskContext context, Throwable ex) {
return new Builder(Errors.RUNTIME.toString(), status)
.instance(context.position().jsonPointer())
.title(ex.getMessage());
Comment thread
fjtirado marked this conversation as resolved.
}

public static Builder runtime(TaskContext context, Exception ex) {
public static Builder runtime(TaskContext context, Throwable ex) {
return runtime(Errors.RUNTIME.status(), context, ex);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,39 +16,19 @@
package io.serverlessworkflow.impl.executors.grpc;

import com.google.protobuf.Message;
import io.grpc.stub.StreamObserver;
import io.serverlessworkflow.impl.WorkflowModel;
import io.serverlessworkflow.impl.TaskContext;
import io.serverlessworkflow.impl.WorkflowContext;
import io.serverlessworkflow.impl.WorkflowModelCollection;
import io.serverlessworkflow.impl.WorkflowModelFactory;
import java.util.concurrent.CompletableFuture;

class CollectionStreamObserver implements StreamObserver<Message> {
private final WorkflowModelCollection modelCollection;
private final WorkflowModelFactory modelFactory;
private final CompletableFuture<WorkflowModel> future;
class CollectionStreamObserver extends ModelStreamObserver<WorkflowModelCollection> {

public CollectionStreamObserver(WorkflowModelFactory modelFactory) {
this.modelCollection = modelFactory.createCollection();
this.modelFactory = modelFactory;
this.future = new CompletableFuture<>();
public CollectionStreamObserver(WorkflowContext workflowContext, TaskContext taskContext) {
super(workflowContext, taskContext);
this.model = modelFactory.createCollection();
}

@Override
public void onNext(Message value) {
modelCollection.add(ProtobufMessageUtils.convert(value, modelFactory));
}

@Override
public void onError(Throwable t) {
future.completeExceptionally(t);
}

@Override
public void onCompleted() {
future.complete(modelCollection);
}

public CompletableFuture<WorkflowModel> future() {
return future;
model.add(ProtobufMessageUtils.convert(value, modelFactory));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
import com.github.os72.protocjar.Protoc;
import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.DescriptorProtos.FileDescriptorProto;
import com.google.protobuf.DescriptorProtos.FileDescriptorSet;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import com.google.protobuf.Descriptors.FileDescriptor;
import com.google.protobuf.ProtocolStringList;
import io.serverlessworkflow.impl.resources.ExternalResourceHandler;
import java.io.FileNotFoundException;
import java.io.IOException;
Expand All @@ -28,15 +32,20 @@
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class FileDescriptorReader {

private static final Logger logger = LoggerFactory.getLogger(FileDescriptorReader.class);

static FileDescriptorProto readDescriptor(ExternalResourceHandler externalResourceHandler) {

static FileDescriptor readDescriptor(ExternalResourceHandler externalResourceHandler) {
Path grpcDir = null;
try (InputStream inputStream = externalResourceHandler.open()) {
grpcDir = Files.createTempDirectory("serverless-workflow-");
Expand All @@ -46,21 +55,14 @@ static FileDescriptorProto readDescriptor(ExternalResourceHandler externalResour
Files.copy(inputStream, protoFile);
Path descriptorOutput = grpcDir.resolve("descriptor.protobin");
generateFileDescriptor(grpcDir, protoFile, descriptorOutput);
return DescriptorProtos.FileDescriptorSet.newBuilder()
.mergeFrom(Files.readAllBytes(descriptorOutput))
.build()
.getFileList()
.stream()
.filter(n -> n.getName().equals(name))
.findAny()
.orElseThrow(
() ->
new IllegalStateException(
"There was something really wrong generating the protoc descriptor, it does not included a file named "
+ name));
return toFileDescriptor(
DescriptorProtos.FileDescriptorSet.newBuilder()
.mergeFrom(Files.readAllBytes(descriptorOutput))
.build(),
name);
} catch (IOException e) {
throw new UncheckedIOException(
"Unable to process gRPC proto file associated with resource "
"Unable to process gRPC proto file associated with resource: "
+ externalResourceHandler.name(),
e);
} finally {
Expand All @@ -70,6 +72,54 @@ static FileDescriptorProto readDescriptor(ExternalResourceHandler externalResour
}
}

private static FileDescriptor toFileDescriptor(FileDescriptorSet set, String name) {
List<FileDescriptorProto> remainingProtos = new ArrayList<>(set.getFileList());
Map<String, FileDescriptor> builtDescriptors = new HashMap<>();
while (!remainingProtos.isEmpty()) {
Iterator<FileDescriptorProto> iterator = remainingProtos.iterator();
boolean modified = false;
while (iterator.hasNext()) {
FileDescriptorProto proto = iterator.next();
ProtocolStringList requiredDependencies = proto.getDependencyList();
List<FileDescriptor> currentDependencies =
requiredDependencies.stream()
.map(builtDescriptors::get)
.filter(Objects::nonNull)
.toList();
if (requiredDependencies.size() == currentDependencies.size()) {
FileDescriptor fileDescriptor = buildFileDescriptor(proto, currentDependencies);
String protoName = proto.getName();
if (protoName.equals(name)) {
return fileDescriptor;
}
builtDescriptors.put(protoName, fileDescriptor);
iterator.remove();
modified = true;
}
}
if (!modified) {
throw new IllegalStateException(
"Unable to build valid gRPC descriptor from proto file associated with resource: "
+ name
+ ". There are missing or circular dependencies");
}
}
throw new IllegalStateException("Cannot build FileDescriptor for name: " + name);
}

private static FileDescriptor buildFileDescriptor(
FileDescriptorProto proto, List<FileDescriptor> currentDependencies) {
try {
return FileDescriptor.buildFrom(
proto, currentDependencies.toArray(new FileDescriptor[currentDependencies.size()]));
} catch (DescriptorValidationException e) {
throw new IllegalStateException(
"Unable to build valid gRPC descriptor from proto file associated with resource: "
+ proto.getName(),
e);
}
}

private static void deleteTempFiles(Path grpcDir) {
try {
Files.walkFileTree(
Expand Down Expand Up @@ -142,15 +192,10 @@ private static void generateFileDescriptorWithProcessBuilder(String[] protocArgs
String[] command = new String[protocArgs.length + 1];
command[0] = "protoc";
System.arraycopy(protocArgs, 0, command, 1, protocArgs.length);

ProcessBuilder processBuilder = new ProcessBuilder(command);

processBuilder.redirectErrorStream(true);

Process process = processBuilder.start();

int exitCode = process.waitFor();

if (exitCode != 0) {
throw new IOException(
"Unable to generate file descriptor using system protoc. Exit code: " + exitCode);
Expand Down
Loading
Loading