Skip to content

Commit d238cec

Browse files
committed
[Fix #1380] Adding AllStrategyCorrelationInfo customization.
Signed-off-by: fjtirado <ftirados@redhat.com>
1 parent 0a9a29e commit d238cec

5 files changed

Lines changed: 146 additions & 40 deletions

File tree

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,9 @@
4343
import io.serverlessworkflow.impl.resources.ExternalResourceHandler;
4444
import io.serverlessworkflow.impl.resources.ResourceLoaderFactory;
4545
import io.serverlessworkflow.impl.resources.URITemplateResolver;
46+
import io.serverlessworkflow.impl.scheduler.AllStrategyCorrelationInfoFactory;
4647
import io.serverlessworkflow.impl.scheduler.DefaultWorkflowScheduler;
48+
import io.serverlessworkflow.impl.scheduler.InMemoryAllStrategyCorrelationInfo;
4749
import io.serverlessworkflow.impl.scheduler.WorkflowScheduler;
4850
import io.serverlessworkflow.impl.schema.SchemaValidator;
4951
import io.serverlessworkflow.impl.schema.SchemaValidatorFactory;
@@ -93,6 +95,7 @@ public class WorkflowApplication implements AutoCloseable {
9395
private final URI defaultCatalogURI;
9496
private final Collection<CallableTaskProxyBuilder> callableProxyBuilders;
9597
private final CloudEventPredicateFactory cloudEventPredicateFactory;
98+
private final AllStrategyCorrelationInfoFactory allStrategyCorrelationInfoFactory;
9699

97100
private WorkflowApplication(Builder builder) {
98101
this.taskFactory = builder.taskFactory;
@@ -122,6 +125,7 @@ private WorkflowApplication(Builder builder) {
122125
this.id = builder.id;
123126
this.callableProxyBuilders = builder.callableProxyBuilders;
124127
this.cloudEventPredicateFactory = builder.cloudEventPredicateFactory;
128+
this.allStrategyCorrelationInfoFactory = builder.allStrategyCorrelationInfoFactory;
125129
}
126130

127131
public TaskExecutorFactory taskFactory() {
@@ -240,6 +244,7 @@ public SchemaValidator getValidator(SchemaInline inline) {
240244
private Optional<FunctionReader> functionReader;
241245
private URI defaultCatalogURI;
242246
private CloudEventPredicateFactory cloudEventPredicateFactory;
247+
private AllStrategyCorrelationInfoFactory allStrategyCorrelationInfoFactory;
243248

244249
private Builder() {
245250
ServiceLoader.load(NamedWorkflowAdditionalObject.class)
@@ -372,6 +377,12 @@ public Builder withCloudEventPredicateFactory(
372377
return this;
373378
}
374379

380+
public Builder withAllStrategyCorrelationInfo(
381+
AllStrategyCorrelationInfoFactory allStrategyCorrelationInfoFactory) {
382+
this.allStrategyCorrelationInfoFactory = allStrategyCorrelationInfoFactory;
383+
return this;
384+
}
385+
375386
public WorkflowApplication build() {
376387

377388
if (modelFactory == null) {
@@ -432,6 +443,10 @@ public WorkflowApplication build() {
432443
loadFirst(CloudEventPredicateFactory.class)
433444
.orElseGet(() -> new DefaultCloudEventPredicateFactory());
434445
}
446+
if (allStrategyCorrelationInfoFactory == null) {
447+
allStrategyCorrelationInfoFactory = definition -> new InMemoryAllStrategyCorrelationInfo();
448+
}
449+
435450
if (defaultCatalogURI == null) {
436451
defaultCatalogURI = URI.create("https://github.com/serverlessworkflow/catalog");
437452
}
@@ -559,4 +574,8 @@ public <T> Optional<T> additionalObject(
559574
public Collection<CallableTaskProxyBuilder> callableProxyBuilders() {
560575
return callableProxyBuilders;
561576
}
577+
578+
public AllStrategyCorrelationInfoFactory allStrategyCorrelationInfoFactory() {
579+
return allStrategyCorrelationInfoFactory;
580+
}
562581
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.scheduler;
17+
18+
import io.cloudevents.CloudEvent;
19+
import io.serverlessworkflow.impl.events.EventRegistrationBuilder;
20+
import java.util.Collection;
21+
import java.util.function.Consumer;
22+
23+
public interface AllStrategyCorrelationInfo extends AutoCloseable {
24+
Collection<CloudEvent> correlate(
25+
EventRegistrationBuilder reg, CloudEvent event, Consumer<Collection<CloudEvent>> starter);
26+
27+
default void close() {}
28+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.scheduler;
17+
18+
import io.serverlessworkflow.impl.WorkflowDefinition;
19+
import java.util.function.Function;
20+
21+
public interface AllStrategyCorrelationInfoFactory
22+
extends Function<WorkflowDefinition, AllStrategyCorrelationInfo> {}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.scheduler;
17+
18+
import io.cloudevents.CloudEvent;
19+
import io.serverlessworkflow.impl.events.EventRegistrationBuilder;
20+
import java.util.ArrayList;
21+
import java.util.Collection;
22+
import java.util.HashMap;
23+
import java.util.List;
24+
import java.util.Map;
25+
import java.util.function.Consumer;
26+
27+
public class InMemoryAllStrategyCorrelationInfo implements AllStrategyCorrelationInfo {
28+
29+
private final Map<EventRegistrationBuilder, List<CloudEvent>> correlatedEvents = new HashMap<>();
30+
31+
@Override
32+
public Collection<CloudEvent> correlate(
33+
EventRegistrationBuilder reg, CloudEvent event, Consumer<Collection<CloudEvent>> starter) {
34+
Collection<CloudEvent> collection = new ArrayList<>();
35+
// to minimize the critical section, conversion is done later, here we are
36+
// performing just collection, if any
37+
synchronized (correlatedEvents) {
38+
correlatedEvents.computeIfAbsent(reg, k -> new ArrayList<>()).add(event);
39+
Collection<List<CloudEvent>> events = correlatedEvents.values();
40+
if (satisfyCondition(events)) {
41+
for (List<CloudEvent> values : events) {
42+
collection.add(values.remove(0));
43+
}
44+
}
45+
}
46+
if (!collection.isEmpty()) {
47+
starter.accept(collection);
48+
}
49+
return collection;
50+
}
51+
52+
private boolean satisfyCondition(Collection<List<CloudEvent>> events) {
53+
for (List<CloudEvent> values : events) {
54+
if (values.isEmpty()) {
55+
return false;
56+
}
57+
}
58+
return true;
59+
}
60+
61+
@Override
62+
public void close() {
63+
correlatedEvents.clear();
64+
}
65+
}

impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledEventConsumer.java

Lines changed: 12 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,17 @@
1515
*/
1616
package io.serverlessworkflow.impl.scheduler;
1717

18+
import static io.serverlessworkflow.impl.WorkflowUtils.safeClose;
19+
1820
import io.cloudevents.CloudEvent;
1921
import io.serverlessworkflow.impl.WorkflowDefinition;
2022
import io.serverlessworkflow.impl.WorkflowModel;
2123
import io.serverlessworkflow.impl.WorkflowModelCollection;
2224
import io.serverlessworkflow.impl.events.EventConsumer;
2325
import io.serverlessworkflow.impl.events.EventRegistration;
24-
import io.serverlessworkflow.impl.events.EventRegistrationBuilder;
2526
import io.serverlessworkflow.impl.events.EventRegistrationBuilderInfo;
2627
import java.util.ArrayList;
2728
import java.util.Collection;
28-
import java.util.HashMap;
29-
import java.util.List;
30-
import java.util.Map;
3129
import java.util.function.Function;
3230

3331
public class ScheduledEventConsumer implements AutoCloseable {
@@ -37,8 +35,8 @@ public class ScheduledEventConsumer implements AutoCloseable {
3735
private final EventRegistrationBuilderInfo builderInfo;
3836
private final EventConsumer eventConsumer;
3937
private final ScheduledInstanceRunnable instanceRunner;
40-
private Map<EventRegistrationBuilder, List<CloudEvent>> correlatedEvents;
41-
private Collection<EventRegistration> registrations = new ArrayList<>();
38+
private final Collection<EventRegistration> registrations = new ArrayList<>();
39+
private final AllStrategyCorrelationInfo allStrategyCorrelationInfo;
4240

4341
public ScheduledEventConsumer(
4442
WorkflowDefinition definition,
@@ -50,17 +48,21 @@ public ScheduledEventConsumer(
5048
this.builderInfo = builderInfo;
5149
this.instanceRunner = instanceRunner;
5250
this.eventConsumer = definition.application().eventConsumer();
51+
this.allStrategyCorrelationInfo =
52+
definition.application().allStrategyCorrelationInfoFactory().apply(definition);
5353
if (builderInfo.registrations().isAnd()
5454
&& builderInfo.registrations().registrations().size() > 1) {
55-
this.correlatedEvents = new HashMap<>();
5655
builderInfo
5756
.registrations()
5857
.registrations()
5958
.forEach(
6059
reg -> {
61-
correlatedEvents.put(reg, new ArrayList<>());
6260
registrations.add(
63-
eventConsumer.register(reg, ce -> consumeEvent(reg, (CloudEvent) ce)));
61+
eventConsumer.register(
62+
reg,
63+
ce ->
64+
allStrategyCorrelationInfo.correlate(
65+
reg, (CloudEvent) ce, this::start)));
6466
});
6567
} else {
6668
builderInfo
@@ -71,34 +73,6 @@ public ScheduledEventConsumer(
7173
}
7274
}
7375

74-
private void consumeEvent(EventRegistrationBuilder reg, CloudEvent ce) {
75-
Collection<Collection<CloudEvent>> collections = new ArrayList<>();
76-
// to minimize the critical section, conversion is done later, here we are
77-
// performing
78-
// just collection, if any
79-
synchronized (correlatedEvents) {
80-
correlatedEvents.get(reg).add((CloudEvent) ce);
81-
while (satisfyCondition()) {
82-
Collection<CloudEvent> collection = new ArrayList<>();
83-
for (List<CloudEvent> values : correlatedEvents.values()) {
84-
collection.add(values.remove(0));
85-
}
86-
collections.add(collection);
87-
}
88-
}
89-
// convert and start outside synchronized
90-
collections.forEach(this::start);
91-
}
92-
93-
private boolean satisfyCondition() {
94-
for (List<CloudEvent> values : correlatedEvents.values()) {
95-
if (values.isEmpty()) {
96-
return false;
97-
}
98-
}
99-
return true;
100-
}
101-
10276
protected void start(CloudEvent ce) {
10377
WorkflowModelCollection model = definition.application().modelFactory().createCollection();
10478
model.add(converter.apply(ce));
@@ -112,9 +86,7 @@ protected void start(Collection<CloudEvent> ces) {
11286
}
11387

11488
public void close() {
115-
if (correlatedEvents != null) {
116-
correlatedEvents.clear();
117-
}
11889
registrations.forEach(eventConsumer::unregister);
90+
safeClose(allStrategyCorrelationInfo);
11991
}
12092
}

0 commit comments

Comments
 (0)