From 7de942d33b4c8794a8a5d51bc071207c3553f6b7 Mon Sep 17 00:00:00 2001 From: ArtDu Date: Wed, 1 Apr 2026 15:36:34 +0300 Subject: [PATCH] test(tracing): add OpenTelemetry tracing example with Jaeger Add testOpenTelemetryTracingWithJaeger() demonstrating distributed tracing using OpenTelemetry API with Handlers: - Create spans for request lifecycle (onBeforeSend, onSuccess, onTimeout) - Parse request/response data using Jackson mapping - Export traces to Jaeger via TestContainer - Show flamegraph with timeout and late response handling The test is @Disabled by default as it requires Docker and keeps Jaeger running for manual inspection. Co-Authored-By: Claude Opus 4.6 --- tarantool-client/pom.xml | 25 ++ .../integration/TarantoolBoxClientTest.java | 331 ++++++++++++++++++ tarantool-java-sdk-bom/pom.xml | 33 ++ 3 files changed, 389 insertions(+) diff --git a/tarantool-client/pom.xml b/tarantool-client/pom.xml index cda876a4..3dbafbc3 100644 --- a/tarantool-client/pom.xml +++ b/tarantool-client/pom.xml @@ -66,6 +66,31 @@ 2.20.0 test + + io.opentelemetry + opentelemetry-api + test + + + io.opentelemetry + opentelemetry-sdk + test + + + io.opentelemetry + opentelemetry-sdk-testing + test + + + io.opentelemetry + opentelemetry-exporter-otlp + test + + + io.opentelemetry + opentelemetry-exporter-logging + test + diff --git a/tarantool-client/src/test/java/io/tarantool/client/integration/TarantoolBoxClientTest.java b/tarantool-client/src/test/java/io/tarantool/client/integration/TarantoolBoxClientTest.java index 3b052760..24131cbd 100644 --- a/tarantool-client/src/test/java/io/tarantool/client/integration/TarantoolBoxClientTest.java +++ b/tarantool-client/src/test/java/io/tarantool/client/integration/TarantoolBoxClientTest.java @@ -32,10 +32,17 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.core.type.TypeReference; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.condition.DisabledIfEnvironmentVariable; @@ -54,8 +61,20 @@ import static io.tarantool.client.box.TarantoolBoxSpace.WITHOUT_ENABLED_FETCH_SCHEMA_OPTION_FOR_TARANTOOL_LESS_3_0_0; import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_DATA; +import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_ERROR; import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_SYNC_ID; +import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_TYPE_AUTH; import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_TYPE_CALL; +import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_TYPE_DELETE; +import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_TYPE_EVAL; +import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_TYPE_INSERT; +import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_TYPE_PING; +import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_TYPE_REPLACE; +import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_TYPE_SELECT; +import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_TYPE_UPDATE; +import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_TYPE_UPSERT; +import static io.tarantool.core.protocol.requests.IProtoConstant.MP_IPROTO_FUNCTION_NAME; +import static io.tarantool.core.protocol.requests.IProtoConstant.MP_IPROTO_TUPLE; import static io.tarantool.mapping.BaseTarantoolJacksonMapping.objectMapper; import io.tarantool.client.BaseOptions; import io.tarantool.client.ClientType; @@ -1681,4 +1700,316 @@ void testGetServerVersion() throws Exception { TarantoolVersion version = client.getServerVersion().join(); assertEquals(tarantoolMajorVersion, version.getMajor()); } + + /** + * Demonstrates distributed tracing using OpenTelemetry API with Handlers. + * + *

This test shows how to integrate Tarantool client with OpenTelemetry to track request + * lifecycle: + * + *

+ * + *

Jaeger is started via TestContainer. To view traces, open the URL printed in console + * (http://localhost:16686) after test starts running. + */ + @Test + @Timeout(5000) + @Disabled + public void testOpenTelemetryTracingWithJaeger() throws Exception { + // Start Jaeger via TestContainer + org.testcontainers.containers.GenericContainer jaeger = + new org.testcontainers.containers.GenericContainer<>("jaegertracing/all-in-one:1.50") + .withExposedPorts(16686, 4317) + .withEnv("COLLECTOR_OTLP_ENABLED", "true"); + jaeger.start(); + + String jaegerUrl = "http://" + jaeger.getHost() + ":" + jaeger.getMappedPort(16686); + String otlpEndpoint = "http://" + jaeger.getHost() + ":" + jaeger.getMappedPort(4317); + + System.out.println("========================================"); + System.out.println("Jaeger UI available at: " + jaegerUrl); + System.out.println("OTLP endpoint: " + otlpEndpoint); + System.out.println("========================================"); + + // Set up OTLP exporter to send traces to Jaeger + io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter spanExporter = + io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter.builder() + .setEndpoint(otlpEndpoint) + .build(); + + io.opentelemetry.sdk.resources.Resource resource = + io.opentelemetry.sdk.resources.Resource.builder() + .put( + io.opentelemetry.api.common.AttributeKey.stringKey("service.name"), + "tarantool-java-sdk") + .put(io.opentelemetry.api.common.AttributeKey.stringKey("service.version"), "2.0.0") + .build(); + + SdkTracerProvider tracerProvider = + SdkTracerProvider.builder() + .setResource(resource) + .addSpanProcessor(SimpleSpanProcessor.create(spanExporter)) + .build(); + + OpenTelemetrySdk openTelemetry = + OpenTelemetrySdk.builder().setTracerProvider(tracerProvider).build(); + + Tracer tracer = openTelemetry.getTracer("tarantool-test"); + + // Maps to store active spans by syncId + java.util.Map requestSpans = new java.util.concurrent.ConcurrentHashMap<>(); + java.util.Map timeoutSpans = new java.util.concurrent.ConcurrentHashMap<>(); + + TarantoolBoxClient testClient = + TarantoolFactory.box() + .withUser(API_USER) + .withPassword(CREDS.get(API_USER)) + .withHost(tt.getHost()) + .withPort(tt.getFirstMappedPort()) + .withHandlers( + Handlers.builder() + .onBeforeSend( + request -> { + // Extract function name and arguments from request body + String functionName = extractFunctionName(request); + String args = extractArguments(request); + + String requestTypeName = getRequestTypeName(request.getRequestType()); + + // 1. Main request span - lives until final result (success or late + // response) + Span requestSpan = + tracer + .spanBuilder( + "tarantool." + requestTypeName + ":" + request.getSyncId()) + .setAttribute("request.type", request.getRequestType()) + .setAttribute("request.type.name", requestTypeName) + .setAttribute("sync.id", request.getSyncId()) + .setAttribute("tarantool.function", functionName) + .setAttribute("tarantool.arguments", args) + .startSpan(); + requestSpans.put(request.getSyncId(), requestSpan); + + // 2. Timeout span - created now, ended on timeout, removed on success + Span timeoutSpan = + tracer + .spanBuilder( + "tarantool." + + requestTypeName + + ".timeout:" + + request.getSyncId()) + .setParent( + io.opentelemetry.context.Context.current().with(requestSpan)) + .setAttribute("sync.id", request.getSyncId()) + .startSpan(); + timeoutSpans.put(request.getSyncId(), timeoutSpan); + }) + .onSuccess( + response -> { + // Remove timeout span (success happened, no timeout) + Span timeoutSpan = timeoutSpans.remove(response.getSyncId()); + if (timeoutSpan != null) { + // Don't end it, just discard - timeout didn't happen + } + + // End main request span + Span requestSpan = requestSpans.remove(response.getSyncId()); + if (requestSpan != null) { + requestSpan.setAttribute("response.sync_id", response.getSyncId()); + requestSpan.setStatus(StatusCode.OK); + + // Parse and log response data + String responseData = extractResponseData(response); + requestSpan.setAttribute("response.data", responseData); + + requestSpan.end(); + } + }) + .onTimeout( + request -> { + // End timeout span + Span timeoutSpan = timeoutSpans.remove(request.getSyncId()); + if (timeoutSpan != null) { + timeoutSpan.addEvent("Request timed out"); + timeoutSpan.setStatus(StatusCode.ERROR, "Request timed out"); + timeoutSpan.setAttribute("error.type", "timeout"); + timeoutSpan.recordException(new TimeoutException("Request timed out")); + timeoutSpan.end(); + } + }) + .onIgnoredResponse( + response -> { + // End main request span (late response arrived) + Span requestSpan = requestSpans.remove(response.getSyncId()); + if (requestSpan != null) { + requestSpan.addEvent("Late response arrived"); + requestSpan.setAttribute("response.sync_id", response.getSyncId()); + requestSpan.setAttribute("late.response", true); + + // Parse and log response data + String responseData = extractResponseData(response); + requestSpan.setAttribute("response.data", responseData); + + requestSpan.end(); + } + }) + .build()) + .build(); + + // Test successful request + List result = testClient.call("echo", Arrays.asList(42, "test")).join().get(); + assertEquals(Arrays.asList(42, "test"), result); + + // Wait for async callbacks and span export + Thread.sleep(1000); + + System.out.println("Sent successful request trace to Jaeger"); + + // Test timeout scenario + Options timeoutOptions = BaseOptions.builder().withTimeout(100L).build(); + + Exception ex = + assertThrows( + CompletionException.class, + () -> testClient.call("slow_echo", Arrays.asList(1, true), timeoutOptions).join()); + assertEquals(TimeoutException.class, ex.getCause().getClass()); + + // Wait for timeout handler and span export + Thread.sleep(1000); + + System.out.println("Sent timeout error trace to Jaeger"); + + // Print Jaeger URL again so user can view traces after test completes + System.out.println("========================================"); + System.out.println("Test completed. View traces at: " + jaegerUrl); + System.out.println("========================================"); + + // Keep Jaeger running for a moment to allow viewing traces + Thread.sleep(3000000); + jaeger.stop(); + openTelemetry.close(); + testClient.close(); + } + + /** Helper method to get human-readable request type name. */ + private String getRequestTypeName(int requestType) { + switch (requestType) { + case IPROTO_TYPE_SELECT: + return "SELECT"; + case IPROTO_TYPE_INSERT: + return "INSERT"; + case IPROTO_TYPE_REPLACE: + return "REPLACE"; + case IPROTO_TYPE_UPDATE: + return "UPDATE"; + case IPROTO_TYPE_DELETE: + return "DELETE"; + case IPROTO_TYPE_AUTH: + return "AUTH"; + case IPROTO_TYPE_EVAL: + return "EVAL"; + case IPROTO_TYPE_UPSERT: + return "UPSERT"; + case IPROTO_TYPE_CALL: + return "CALL"; + case IPROTO_TYPE_PING: + return "PING"; + default: + return "UNKNOWN(" + requestType + ")"; + } + } + + /** Helper method to extract function name from CALL request. */ + private String extractFunctionName(IProtoRequest request) { + try { + byte[] packetBytes = request.getPacket(org.msgpack.core.MessagePack.newDefaultBufferPacker()); + org.msgpack.core.MessageUnpacker unpacker = + org.msgpack.core.MessagePack.newDefaultUnpacker(packetBytes); + unpacker.unpackInt(); // Skip size prefix + unpacker.unpackValue(); // Skip header + org.msgpack.value.Value bodyValue = unpacker.unpackValue(); + + if (bodyValue.isMapValue()) { + org.msgpack.value.Value functionNameValue = + bodyValue.asMapValue().map().get(MP_IPROTO_FUNCTION_NAME); + if (functionNameValue != null && functionNameValue.isStringValue()) { + return functionNameValue.asStringValue().asString(); + } + } + } catch (Exception e) { + // Ignore parsing errors + } + return "unknown"; + } + + /** Helper method to extract arguments from CALL request. */ + private String extractArguments(IProtoRequest request) { + try { + byte[] packetBytes = request.getPacket(org.msgpack.core.MessagePack.newDefaultBufferPacker()); + org.msgpack.core.MessageUnpacker unpacker = + org.msgpack.core.MessagePack.newDefaultUnpacker(packetBytes); + unpacker.unpackInt(); // Skip size prefix + unpacker.unpackValue(); // Skip header + org.msgpack.value.Value bodyValue = unpacker.unpackValue(); + + if (bodyValue.isMapValue()) { + org.msgpack.value.Value tupleValue = bodyValue.asMapValue().map().get(MP_IPROTO_TUPLE); + if (tupleValue != null) { + return tupleValue.toString(); + } + } + } catch (Exception e) { + // Ignore parsing errors + } + return "[]"; + } + + /** Helper method to extract response data from IProtoResponse. */ + private String extractResponseData(IProtoResponse response) { + try { + StringBuilder sb = new StringBuilder(); + sb.append("{"); + + // Extract data from response body using Jackson mapping + java.util.Map byteBodyValues = + response.getByteBodyValues(); + + boolean first = true; + for (java.util.Map.Entry entry : + byteBodyValues.entrySet()) { + if (!first) { + sb.append(", "); + } + first = false; + + String keyName = getResponseFieldName(entry.getKey()); + Object value = + io.tarantool.mapping.BaseTarantoolJacksonMapping.readValue( + entry.getValue(), Object.class); + sb.append(keyName).append("=").append(value); + } + + sb.append("}"); + return sb.toString(); + } catch (Exception e) { + return "{error=" + e.getMessage() + "}"; + } + } + + /** Helper method to get human-readable response field name. */ + private String getResponseFieldName(int fieldCode) { + switch (fieldCode) { + case IPROTO_DATA: + return "data"; + case IPROTO_ERROR: + return "error"; + default: + return "field_" + fieldCode; + } + } } diff --git a/tarantool-java-sdk-bom/pom.xml b/tarantool-java-sdk-bom/pom.xml index af8d5dda..e911c257 100644 --- a/tarantool-java-sdk-bom/pom.xml +++ b/tarantool-java-sdk-bom/pom.xml @@ -33,6 +33,7 @@ 2.20.0 5.5.1 1.2.2 + 1.48.0 @@ -235,6 +236,38 @@ ${instancio.version} test + + + + io.opentelemetry + opentelemetry-api + ${opentelemetry.version} + test + + + io.opentelemetry + opentelemetry-sdk + ${opentelemetry.version} + test + + + io.opentelemetry + opentelemetry-sdk-testing + ${opentelemetry.version} + test + + + io.opentelemetry + opentelemetry-exporter-otlp + ${opentelemetry.version} + test + + + io.opentelemetry + opentelemetry-exporter-logging + ${opentelemetry.version} + test +