Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -547,15 +547,24 @@ public ExecuteMultiOperationResponse executeMultiOperation(

@Override
public StartActivityExecutionResponse startActivity(StartActivityExecutionRequest request) {
Map<String, String> tags = tagsForStartActivity(request);
Scope scope = metricsScope.tagged(tags);
return grpcRetryer.retryWithResult(
() ->
service
.blockingStub()
.withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
.withOption(METRICS_TAGS_CALL_OPTIONS_KEY, scope)
.startActivityExecution(request),
grpcRetryerOptions);
}

private static Map<String, String> tagsForStartActivity(StartActivityExecutionRequest request) {
return new ImmutableMap.Builder<String, String>(2)
.put(MetricsTag.ACTIVITY_TYPE, request.getActivityType().getName())
.put(MetricsTag.TASK_QUEUE, request.getTaskQueue().getName())
.build();
}

@Override
public PollActivityExecutionResponse pollActivity(PollActivityExecutionRequest request) {
return grpcRetryer.retryWithResult(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static io.temporal.testUtils.Eventually.assertEventually;
import static io.temporal.testing.internal.SDKTestWorkflowRule.NAMESPACE;
import static junit.framework.TestCase.*;
import static org.junit.Assume.assumeTrue;

import com.uber.m3.tally.RootScopeBuilder;
import io.micrometer.core.instrument.*;
Expand All @@ -12,9 +13,7 @@
import io.temporal.activity.LocalActivityOptions;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.enums.v1.WorkflowTaskFailedCause;
import io.temporal.client.WorkflowFailedException;
import io.temporal.client.WorkflowOptions;
import io.temporal.client.WorkflowStub;
import io.temporal.client.*;
import io.temporal.common.reporter.MicrometerClientStatsReporter;
import io.temporal.failure.ApplicationFailure;
import io.temporal.failure.CanceledFailure;
Expand Down Expand Up @@ -45,13 +44,18 @@ public class MetricsTest {
public final SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder()
.setWorkflowTypes(QuicklyCompletingWorkflowImpl.class, MultiScenarioWorkflowImpl.class)
.setActivityImplementations(runCallbackActivity)
.setActivityImplementations(runCallbackActivity, new StandaloneMetricsActivityImpl())
.setMetricsScope(
new RootScopeBuilder()
.reporter(new MicrometerClientStatsReporter(registry))
.reportEvery(com.uber.m3.util.Duration.ofMillis(REPORTING_FLUSH_TIME >> 1)))
.build();

private final ActivityClient activityClient =
ActivityClient.newInstance(
testWorkflowRule.getWorkflowServiceStubs(),
ActivityClientOptions.newBuilder().setNamespace(SDKTestWorkflowRule.NAMESPACE).build());

private static final List<Tag> TAGS_NAMESPACE =
MetricsTag.defaultTags(NAMESPACE).entrySet().stream()
.map(
Expand Down Expand Up @@ -137,6 +141,35 @@ public void testAsynchronousStartAndGetResult() throws InterruptedException, Exe
});
}

@Test
public void testStandaloneActivityStartRequestTags() {
// SAA is not supported by in-mem test server yet
assumeTrue(SDKTestWorkflowRule.useExternalService);

activityClient.execute(
StandaloneMetricsActivity.class,
StandaloneMetricsActivity::run,
StartActivityOptions.newBuilder()
.setId("metrics-standalone-act-" + UUID.randomUUID())
.setTaskQueue(testWorkflowRule.getTaskQueue())
.setScheduleToCloseTimeout(Duration.ofSeconds(10))
.build());

List<Tag> startActivityRequestTags =
replaceTags(
tagsNamespaceQueue,
MetricsTag.OPERATION_NAME,
"StartActivityExecution",
MetricsTag.ACTIVITY_TYPE,
"StandaloneMetricsActivity");

assertEventually(
Duration.ofSeconds(2),
() ->
assertIntCounter(
1, registry.counter(MetricsType.TEMPORAL_REQUEST, startActivityRequestTags)));
}

@Test
public void testWorkflowSuccess() {
String result =
Expand Down Expand Up @@ -369,4 +402,15 @@ public void runCallback() {
}
}
}

@ActivityInterface
public interface StandaloneMetricsActivity {
@ActivityMethod(name = "StandaloneMetricsActivity")
void run();
}

public static class StandaloneMetricsActivityImpl implements StandaloneMetricsActivity {
@Override
public void run() {}
}
}