diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/external/GenericWorkflowClientImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/client/external/GenericWorkflowClientImpl.java index a40e66bc4..cee4ffc89 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/client/external/GenericWorkflowClientImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/external/GenericWorkflowClientImpl.java @@ -547,15 +547,24 @@ public ExecuteMultiOperationResponse executeMultiOperation( @Override public StartActivityExecutionResponse startActivity(StartActivityExecutionRequest request) { + Map tags = tagsForStartActivity(request); + Scope scope = metricsScope.tagged(tags); return grpcRetryer.retryWithResult( () -> service .blockingStub() - .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope) + .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, scope) .startActivityExecution(request), grpcRetryerOptions); } + private static Map tagsForStartActivity(StartActivityExecutionRequest request) { + return new ImmutableMap.Builder(2) + .put(MetricsTag.ACTIVITY_TYPE, request.getActivityType().getName()) + .put(MetricsTag.TASK_QUEUE, request.getTaskQueue().getName()) + .build(); + } + @Override public PollActivityExecutionResponse pollActivity(PollActivityExecutionRequest request) { return grpcRetryer.retryWithResult( diff --git a/temporal-sdk/src/test/java/io/temporal/client/functional/MetricsTest.java b/temporal-sdk/src/test/java/io/temporal/client/functional/MetricsTest.java index ce353e940..a7452fb37 100644 --- a/temporal-sdk/src/test/java/io/temporal/client/functional/MetricsTest.java +++ b/temporal-sdk/src/test/java/io/temporal/client/functional/MetricsTest.java @@ -3,6 +3,7 @@ import static io.temporal.testUtils.Eventually.assertEventually; import static io.temporal.testing.internal.SDKTestWorkflowRule.NAMESPACE; import static junit.framework.TestCase.*; +import static org.junit.Assume.assumeTrue; import com.uber.m3.tally.RootScopeBuilder; import io.micrometer.core.instrument.*; @@ -12,9 +13,7 @@ import io.temporal.activity.LocalActivityOptions; import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.api.enums.v1.WorkflowTaskFailedCause; -import io.temporal.client.WorkflowFailedException; -import io.temporal.client.WorkflowOptions; -import io.temporal.client.WorkflowStub; +import io.temporal.client.*; import io.temporal.common.reporter.MicrometerClientStatsReporter; import io.temporal.failure.ApplicationFailure; import io.temporal.failure.CanceledFailure; @@ -45,13 +44,18 @@ public class MetricsTest { public final SDKTestWorkflowRule testWorkflowRule = SDKTestWorkflowRule.newBuilder() .setWorkflowTypes(QuicklyCompletingWorkflowImpl.class, MultiScenarioWorkflowImpl.class) - .setActivityImplementations(runCallbackActivity) + .setActivityImplementations(runCallbackActivity, new StandaloneMetricsActivityImpl()) .setMetricsScope( new RootScopeBuilder() .reporter(new MicrometerClientStatsReporter(registry)) .reportEvery(com.uber.m3.util.Duration.ofMillis(REPORTING_FLUSH_TIME >> 1))) .build(); + private final ActivityClient activityClient = + ActivityClient.newInstance( + testWorkflowRule.getWorkflowServiceStubs(), + ActivityClientOptions.newBuilder().setNamespace(SDKTestWorkflowRule.NAMESPACE).build()); + private static final List TAGS_NAMESPACE = MetricsTag.defaultTags(NAMESPACE).entrySet().stream() .map( @@ -137,6 +141,35 @@ public void testAsynchronousStartAndGetResult() throws InterruptedException, Exe }); } + @Test + public void testStandaloneActivityStartRequestTags() { + // SAA is not supported by in-mem test server yet + assumeTrue(SDKTestWorkflowRule.useExternalService); + + activityClient.execute( + StandaloneMetricsActivity.class, + StandaloneMetricsActivity::run, + StartActivityOptions.newBuilder() + .setId("metrics-standalone-act-" + UUID.randomUUID()) + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .setScheduleToCloseTimeout(Duration.ofSeconds(10)) + .build()); + + List startActivityRequestTags = + replaceTags( + tagsNamespaceQueue, + MetricsTag.OPERATION_NAME, + "StartActivityExecution", + MetricsTag.ACTIVITY_TYPE, + "StandaloneMetricsActivity"); + + assertEventually( + Duration.ofSeconds(2), + () -> + assertIntCounter( + 1, registry.counter(MetricsType.TEMPORAL_REQUEST, startActivityRequestTags))); + } + @Test public void testWorkflowSuccess() { String result = @@ -369,4 +402,15 @@ public void runCallback() { } } } + + @ActivityInterface + public interface StandaloneMetricsActivity { + @ActivityMethod(name = "StandaloneMetricsActivity") + void run(); + } + + public static class StandaloneMetricsActivityImpl implements StandaloneMetricsActivity { + @Override + public void run() {} + } }