Skip to content

Commit ada17bf

Browse files
mcruzdevfjtirado
andauthored
Add support to gRPC (#1088)
* Add support to gRPC Signed-off-by: Matheus Cruz <matheuscruz.dev@gmail.com> * Apply pull request suggestions Signed-off-by: Matheus Cruz <matheuscruz.dev@gmail.com> * Point to proto in resources dir Signed-off-by: Matheus Cruz <matheuscruz.dev@gmail.com> * Apply @fjtirado suggestions Signed-off-by: Matheus Cruz <matheuscruz.dev@gmail.com> * Refactor asyncStreamingCall method Signed-off-by: Matheus Cruz <matheuscruz.dev@gmail.com> * Apply spotless Signed-off-by: Matheus Cruz <matheuscruz.dev@gmail.com> * Remove jackson Signed-off-by: Matheus Cruz <matheuscruz.dev@gmail.com> * Add support to gRPC Signed-off-by: Matheus Cruz <matheuscruz.dev@gmail.com> * Remove local var Signed-off-by: Matheus Cruz <matheuscruz.dev@gmail.com> * Review suggestions Signed-off-by: fjtirado <ftirados@redhat.com> * Change FileDescriptorReader to be a interface Signed-off-by: Matheus Cruz <matheuscruz.dev@gmail.com> * Add support to local system protoc binary Signed-off-by: Matheus Cruz <matheuscruz.dev@gmail.com> * Undo changes on HttpExecutor Signed-off-by: fjtirado <ftirados@redhat.com> * FileDescriptorReader is a static classs, not an interface. Also reducing the number of public classes Signed-off-by: fjtirado <ftirados@redhat.com> --------- Signed-off-by: Matheus Cruz <matheuscruz.dev@gmail.com> Signed-off-by: fjtirado <ftirados@redhat.com> Co-authored-by: fjtirado <ftirados@redhat.com> Co-authored-by: Francisco Javier Tirado Sarti <65240126+fjtirado@users.noreply.github.com>
1 parent 3aa1867 commit ada17bf

31 files changed

+1518
-2
lines changed

impl/grpc/pom.xml

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
2+
<modelVersion>4.0.0</modelVersion>
3+
<parent>
4+
<groupId>io.serverlessworkflow</groupId>
5+
<artifactId>serverlessworkflow-impl</artifactId>
6+
<version>8.0.0-SNAPSHOT</version>
7+
</parent>
8+
<artifactId>serverlessworkflow-impl-grpc</artifactId>
9+
<name>Serverless Workflow :: Impl :: gRPC</name>
10+
11+
<dependencies>
12+
<dependency>
13+
<groupId>io.serverlessworkflow</groupId>
14+
<artifactId>serverlessworkflow-impl-core</artifactId>
15+
</dependency>
16+
<dependency>
17+
<groupId>io.grpc</groupId>
18+
<artifactId>grpc-stub</artifactId>
19+
</dependency>
20+
<dependency>
21+
<groupId>com.google.protobuf</groupId>
22+
<artifactId>protobuf-java-util</artifactId>
23+
</dependency>
24+
<dependency>
25+
<groupId>io.serverlessworkflow</groupId>
26+
<artifactId>serverlessworkflow-impl-json</artifactId>
27+
</dependency>
28+
<dependency>
29+
<groupId>com.github.os72</groupId>
30+
<artifactId>protoc-jar</artifactId>
31+
</dependency>
32+
<dependency>
33+
<groupId>io.grpc</groupId>
34+
<artifactId>grpc-protobuf</artifactId>
35+
</dependency>
36+
</dependencies>
37+
</project>
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.executors.grpc;
17+
18+
import com.google.protobuf.Message;
19+
import io.grpc.stub.StreamObserver;
20+
import io.serverlessworkflow.impl.WorkflowModel;
21+
import io.serverlessworkflow.impl.WorkflowModelCollection;
22+
import io.serverlessworkflow.impl.WorkflowModelFactory;
23+
import java.util.concurrent.CompletableFuture;
24+
25+
class CollectionStreamObserver implements StreamObserver<Message> {
26+
private final WorkflowModelCollection modelCollection;
27+
private final WorkflowModelFactory modelFactory;
28+
private final CompletableFuture<WorkflowModel> future;
29+
30+
public CollectionStreamObserver(WorkflowModelFactory modelFactory) {
31+
this.modelCollection = modelFactory.createCollection();
32+
this.modelFactory = modelFactory;
33+
this.future = new CompletableFuture<>();
34+
}
35+
36+
@Override
37+
public void onNext(Message value) {
38+
modelCollection.add(ProtobufMessageUtils.convert(value, modelFactory));
39+
}
40+
41+
@Override
42+
public void onError(Throwable t) {
43+
future.completeExceptionally(t);
44+
}
45+
46+
@Override
47+
public void onCompleted() {
48+
future.complete(modelCollection);
49+
}
50+
51+
public CompletableFuture<WorkflowModel> future() {
52+
return future;
53+
}
54+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.executors.grpc;
17+
18+
import com.google.protobuf.DescriptorProtos;
19+
20+
record FileDescriptorContext(
21+
DescriptorProtos.FileDescriptorSet fileDescriptorSet, String inputProto) {}
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.executors.grpc;
17+
18+
import com.github.os72.protocjar.Protoc;
19+
import com.google.protobuf.DescriptorProtos;
20+
import io.serverlessworkflow.impl.resources.ExternalResourceHandler;
21+
import java.io.FileNotFoundException;
22+
import java.io.IOException;
23+
import java.io.InputStream;
24+
import java.io.UncheckedIOException;
25+
import java.nio.file.Files;
26+
import java.nio.file.Path;
27+
import java.nio.file.StandardCopyOption;
28+
import java.util.Optional;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
32+
class FileDescriptorReader {
33+
34+
private static final Logger logger = LoggerFactory.getLogger(FileDescriptorReader.class);
35+
36+
static FileDescriptorContext readDescriptor(ExternalResourceHandler externalResourceHandler) {
37+
Path grpcDir =
38+
tryCreateTempGrpcDir()
39+
.orElseThrow(
40+
() -> new IllegalStateException("Could not create temporary gRPC directory"));
41+
42+
try (InputStream inputStream = externalResourceHandler.open()) {
43+
44+
Path protoFile = grpcDir.resolve(externalResourceHandler.name());
45+
if (!Files.exists(protoFile)) {
46+
Files.createDirectories(protoFile);
47+
}
48+
49+
Files.copy(inputStream, protoFile, StandardCopyOption.REPLACE_EXISTING);
50+
51+
Path descriptorOutput = grpcDir.resolve("descriptor.protobin");
52+
53+
try {
54+
55+
generateFileDescriptor(grpcDir, protoFile, descriptorOutput);
56+
57+
DescriptorProtos.FileDescriptorSet fileDescriptorSet =
58+
DescriptorProtos.FileDescriptorSet.newBuilder()
59+
.mergeFrom(Files.readAllBytes(descriptorOutput))
60+
.build();
61+
62+
return new FileDescriptorContext(fileDescriptorSet, externalResourceHandler.name());
63+
64+
} catch (IOException e) {
65+
throw new UncheckedIOException(
66+
"Unable to read external resource handler: " + externalResourceHandler.name(), e);
67+
}
68+
} catch (IOException e) {
69+
throw new UncheckedIOException("Unable to read descriptor file", e);
70+
}
71+
}
72+
73+
private static Optional<Path> tryCreateTempGrpcDir() {
74+
try {
75+
return Optional.of(Files.createTempDirectory("serverless-workflow-"));
76+
} catch (IOException e) {
77+
throw new UncheckedIOException("Error while creating temporary gRPC directory", e);
78+
}
79+
}
80+
81+
/**
82+
* Calls protoc binary with <code>--descriptor_set_out=</code> option set. First attempts to use
83+
* the embedded protoc from protoc-jar library. If that fails with FileNotFoundException
84+
* (unsupported architecture), falls back to using the system's installed protoc via
85+
* ProcessBuilder.
86+
*
87+
* @param grpcDir a temporary directory
88+
* @param protoFile the .proto file used by <code>protoc</code> to generate the file descriptor
89+
* @param descriptorOutput the output directory where the descriptor file will be generated
90+
*/
91+
private static void generateFileDescriptor(Path grpcDir, Path protoFile, Path descriptorOutput) {
92+
String[] protocArgs =
93+
new String[] {
94+
"--include_imports",
95+
"--descriptor_set_out=" + descriptorOutput.toAbsolutePath(),
96+
"-I",
97+
grpcDir.toAbsolutePath().toString(),
98+
protoFile.toAbsolutePath().toString()
99+
};
100+
101+
try {
102+
// First attempt: use protoc-jar library
103+
int status = Protoc.runProtoc(protocArgs);
104+
105+
if (status != 0) {
106+
throw new RuntimeException(
107+
"Unable to generate file descriptor, 'protoc' execution failed with status " + status);
108+
}
109+
} catch (FileNotFoundException e) {
110+
// Fallback: try using system's installed protoc
111+
logger.warn(
112+
"Protoc binary not available for this architecture via protoc-jar. "
113+
+ "Attempting to use system-installed protoc...");
114+
generateFileDescriptorWithProcessBuilder(protocArgs);
115+
} catch (InterruptedException e) {
116+
Thread.currentThread().interrupt();
117+
throw new RuntimeException("Unable to generate file descriptor", e);
118+
} catch (IOException e) {
119+
throw new UncheckedIOException("Unable to generate file descriptor", e);
120+
}
121+
}
122+
123+
/**
124+
* Fallback method to generate file descriptor using system's installed protoc via ProcessBuilder.
125+
*
126+
* @param protocArgs the arguments to pass to protoc command
127+
*/
128+
private static void generateFileDescriptorWithProcessBuilder(String[] protocArgs) {
129+
try {
130+
String[] command = new String[protocArgs.length + 1];
131+
command[0] = "protoc";
132+
System.arraycopy(protocArgs, 0, command, 1, protocArgs.length);
133+
134+
ProcessBuilder processBuilder = new ProcessBuilder(command);
135+
136+
processBuilder.redirectErrorStream(true);
137+
138+
Process process = processBuilder.start();
139+
140+
int exitCode = process.waitFor();
141+
142+
if (exitCode != 0) {
143+
throw new IllegalStateException(
144+
"Unable to generate file descriptor using system protoc. Exit code: " + exitCode);
145+
}
146+
} catch (IOException e) {
147+
throw new UncheckedIOException(
148+
"Unable to execute system protoc. Please ensure 'protoc' is installed and available in your system PATH.",
149+
e);
150+
} catch (InterruptedException e) {
151+
Thread.currentThread().interrupt();
152+
throw new IllegalStateException("Protoc execution was interrupted", e);
153+
}
154+
}
155+
156+
private FileDescriptorReader() {}
157+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.executors.grpc;
17+
18+
import io.grpc.Channel;
19+
import io.grpc.ManagedChannelBuilder;
20+
import io.serverlessworkflow.impl.TaskContext;
21+
import io.serverlessworkflow.impl.WorkflowContext;
22+
23+
class GrpcChannelResolver {
24+
25+
static final String GRPC_CHANNEL_PROVIDER = "grpcChannelProvider";
26+
27+
static Channel channel(
28+
WorkflowContext workflowContext,
29+
TaskContext taskContext,
30+
GrpcRequestContext grpcRequestContext) {
31+
return workflowContext
32+
.definition()
33+
.application()
34+
.<Channel>additionalObject(GRPC_CHANNEL_PROVIDER, workflowContext, taskContext)
35+
.orElseGet(
36+
() ->
37+
ManagedChannelBuilder.forAddress(
38+
grpcRequestContext.address(), grpcRequestContext.port())
39+
.usePlaintext()
40+
.build());
41+
}
42+
}

0 commit comments

Comments
 (0)