diff --git a/external-service-impl/rest/pom.xml b/external-service-impl/rest/pom.xml index 63b2443a8e0ab..847f88b0fe043 100644 --- a/external-service-impl/rest/pom.xml +++ b/external-service-impl/rest/pom.xml @@ -171,6 +171,28 @@ antlr4-runtime provided + + + io.opentelemetry.proto + opentelemetry-proto + 1.3.2-alpha + + + + com.google.protobuf + protobuf-java-util + 3.25.3 + + + + com.google.guava + guava + + + diff --git a/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/filter/AuthorizationFilter.java b/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/filter/AuthorizationFilter.java index d973933260cdb..1de9ebc0d5a4d 100644 --- a/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/filter/AuthorizationFilter.java +++ b/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/filter/AuthorizationFilter.java @@ -59,10 +59,13 @@ public AuthorizationFilter() throws AuthException { @Override public void filter(ContainerRequestContext containerRequestContext) throws IOException { + final String requestPath = containerRequestContext.getUriInfo().getPath(); if ("OPTIONS".equals(containerRequestContext.getMethod()) - || "ping".equals(containerRequestContext.getUriInfo().getPath()) - || (config.isEnableSwagger() - && "swagger.json".equals(containerRequestContext.getUriInfo().getPath()))) { + || "ping".equals(requestPath) + || (config.isEnableSwagger() && "swagger.json".equals(requestPath)) + // OTLP receivers are typically run without credentials; the receiver logs in on behalf + // of clients as the otlp_username configured in iotdb-system.properties. + || requestPath.startsWith("rest/v1/otlp/")) { return; } else if (!config.isEnableSwagger() && "swagger.json".equals(containerRequestContext.getUriInfo().getPath())) { diff --git a/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpConverter.java b/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpConverter.java new file mode 100644 index 0000000000000..298a9b7da1eec --- /dev/null +++ b/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpConverter.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.rest.protocol.otlp.v1; + +import org.apache.iotdb.commons.conf.CommonDescriptor; + +import com.google.protobuf.ByteString; +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.KeyValue; + +import java.util.List; + +/** Stateless helpers for converting OTLP protobuf structures into IoTDB row values. */ +final class OtlpConverter { + + private static final String TIMESTAMP_PRECISION = + CommonDescriptor.getInstance().getConfig().getTimestampPrecision(); + + private static final char[] HEX_CHARS = "0123456789abcdef".toCharArray(); + + private OtlpConverter() {} + + /** + * Converts an OTLP Unix-nanoseconds timestamp to the unit IoTDB is currently configured to store. + * OTLP always emits unsigned 64-bit nanoseconds; we return the value in whatever precision the + * database uses so it can be handed directly to {@code InsertTabletStatement}. + */ + static long nanoToDbPrecision(final long unixNano) { + switch (TIMESTAMP_PRECISION) { + case "ns": + return unixNano; + case "us": + return unixNano / 1_000L; + case "ms": + default: + return unixNano / 1_000_000L; + } + } + + /** Lower-case hex encoding. Empty input yields "" so we never store null IDs. */ + static String bytesToHex(final ByteString bytes) { + if (bytes == null || bytes.isEmpty()) { + return ""; + } + final byte[] data = bytes.toByteArray(); + final char[] out = new char[data.length * 2]; + for (int i = 0; i < data.length; i++) { + final int b = data[i] & 0xFF; + out[i * 2] = HEX_CHARS[b >>> 4]; + out[i * 2 + 1] = HEX_CHARS[b & 0x0F]; + } + return new String(out); + } + + /** Serializes an OTLP KeyValueList as a JSON object. Returns {@code "{}"} for empty input. */ + static String attributesToJson(final List attributes) { + if (attributes == null || attributes.isEmpty()) { + return "{}"; + } + final StringBuilder sb = new StringBuilder(64); + sb.append('{'); + boolean first = true; + for (final KeyValue kv : attributes) { + if (!first) { + sb.append(','); + } + first = false; + appendJsonString(sb, kv.getKey()); + sb.append(':'); + appendAnyValue(sb, kv.getValue()); + } + sb.append('}'); + return sb.toString(); + } + + /** + * Fallback database name used when an OTLP request carries no {@code service.name} resource + * attribute. Matches the OpenTelemetry convention for unnamed services. + */ + static final String UNKNOWN_SERVICE_DATABASE = "unknown_service"; + + /** + * Derives a valid IoTDB database identifier from an OTLP {@code service.name}. Lower-cases the + * string and rewrites any character that is not a letter, digit, or underscore into an + * underscore; prefixes an underscore when the first character would be a digit (IoTDB identifiers + * must start with a letter or underscore). Empty / null service names fall back to {@link + * #UNKNOWN_SERVICE_DATABASE}. + * + *

Examples: {@code "claude-code" -> "claude_code"}, {@code "Gemini CLI" -> "gemini_cli"}, + * {@code "codex" -> "codex"}, {@code "" -> "unknown_service"}. + */ + static String deriveDatabaseName(final String serviceName) { + if (serviceName == null || serviceName.isEmpty()) { + return UNKNOWN_SERVICE_DATABASE; + } + final StringBuilder sb = new StringBuilder(serviceName.length()); + for (int i = 0; i < serviceName.length(); i++) { + final char c = serviceName.charAt(i); + if ((c >= 'a' && c <= 'z') || (c >= '0' && c <= '9') || c == '_') { + sb.append(c); + } else if (c >= 'A' && c <= 'Z') { + sb.append((char) (c + 32)); + } else { + sb.append('_'); + } + } + if (sb.length() == 0) { + return UNKNOWN_SERVICE_DATABASE; + } + if (sb.charAt(0) >= '0' && sb.charAt(0) <= '9') { + sb.insert(0, '_'); + } + return sb.toString(); + } + + /** Looks up {@code service.name} from a resource attribute list. Returns "" if absent. */ + static String extractServiceName(final List resourceAttrs) { + if (resourceAttrs == null) { + return ""; + } + for (final KeyValue kv : resourceAttrs) { + if ("service.name".equals(kv.getKey())) { + final AnyValue v = kv.getValue(); + if (v != null && v.hasStringValue()) { + return v.getStringValue(); + } + } + } + return ""; + } + + /** Looks up an attribute by key from a flat attribute list, returning "" if missing. */ + static String extractAttribute(final List attrs, final String key) { + if (attrs == null) { + return ""; + } + for (final KeyValue kv : attrs) { + if (key.equals(kv.getKey())) { + return anyValueToString(kv.getValue()); + } + } + return ""; + } + + private static void appendAnyValue(final StringBuilder sb, final AnyValue value) { + if (value == null) { + sb.append("null"); + return; + } + switch (value.getValueCase()) { + case STRING_VALUE: + appendJsonString(sb, value.getStringValue()); + break; + case BOOL_VALUE: + sb.append(value.getBoolValue()); + break; + case INT_VALUE: + sb.append(value.getIntValue()); + break; + case DOUBLE_VALUE: + { + final double d = value.getDoubleValue(); + if (Double.isFinite(d)) { + sb.append(d); + } else { + appendJsonString(sb, Double.toString(d)); + } + break; + } + case ARRAY_VALUE: + { + sb.append('['); + boolean first = true; + for (final AnyValue item : value.getArrayValue().getValuesList()) { + if (!first) { + sb.append(','); + } + first = false; + appendAnyValue(sb, item); + } + sb.append(']'); + break; + } + case KVLIST_VALUE: + { + sb.append('{'); + boolean first = true; + for (final KeyValue kv : value.getKvlistValue().getValuesList()) { + if (!first) { + sb.append(','); + } + first = false; + appendJsonString(sb, kv.getKey()); + sb.append(':'); + appendAnyValue(sb, kv.getValue()); + } + sb.append('}'); + break; + } + case BYTES_VALUE: + appendJsonString(sb, bytesToHex(value.getBytesValue())); + break; + case VALUE_NOT_SET: + default: + sb.append("null"); + break; + } + } + + private static String anyValueToString(final AnyValue value) { + if (value == null) { + return ""; + } + switch (value.getValueCase()) { + case STRING_VALUE: + return value.getStringValue(); + case BOOL_VALUE: + return Boolean.toString(value.getBoolValue()); + case INT_VALUE: + return Long.toString(value.getIntValue()); + case DOUBLE_VALUE: + return Double.toString(value.getDoubleValue()); + case BYTES_VALUE: + return bytesToHex(value.getBytesValue()); + default: + // Fall back to full JSON encoding for nested structures so callers still get a usable + // string instead of the empty placeholder. + final StringBuilder sb = new StringBuilder(); + appendAnyValue(sb, value); + return sb.toString(); + } + } + + private static void appendJsonString(final StringBuilder sb, final String s) { + sb.append('"'); + for (int i = 0; i < s.length(); i++) { + final char c = s.charAt(i); + switch (c) { + case '"': + sb.append("\\\""); + break; + case '\\': + sb.append("\\\\"); + break; + case '\n': + sb.append("\\n"); + break; + case '\r': + sb.append("\\r"); + break; + case '\t': + sb.append("\\t"); + break; + case '\b': + sb.append("\\b"); + break; + case '\f': + sb.append("\\f"); + break; + default: + if (c < 0x20) { + sb.append(String.format("\\u%04x", (int) c)); + } else { + sb.append(c); + } + } + } + sb.append('"'); + } +} diff --git a/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpHttp.java b/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpHttp.java new file mode 100644 index 0000000000000..4169ab37b8a79 --- /dev/null +++ b/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpHttp.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.rest.protocol.otlp.v1; + +import com.google.protobuf.Message; +import com.google.protobuf.util.JsonFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +import java.nio.charset.StandardCharsets; +import java.util.Locale; + +/** + * Shared HTTP plumbing for the three OTLP JAX-RS resources: parses the request body into the + * appropriate protobuf message (protobuf or JSON), and renders the empty success response in the + * encoding the client expects. + * + *

OTLP/HTTP spec: content types are {@code application/x-protobuf} (default, required) and + * {@code application/json} (optional). Other content types are rejected with 415. + */ +final class OtlpHttp { + + private static final Logger LOGGER = LoggerFactory.getLogger(OtlpHttp.class); + + static final String PROTOBUF = "application/x-protobuf"; + static final String JSON = "application/json"; + + private static final JsonFormat.Parser JSON_PARSER = JsonFormat.parser().ignoringUnknownFields(); + + private OtlpHttp() {} + + /** True if the request body should be parsed as protobuf; false for JSON. */ + static boolean isProtobuf(final HttpHeaders headers) { + final String raw = headers.getHeaderString(HttpHeaders.CONTENT_TYPE); + if (raw == null || raw.isEmpty()) { + // Default per OTLP spec. + return true; + } + final String contentType = raw.toLowerCase(Locale.ROOT); + if (contentType.startsWith(PROTOBUF)) { + return true; + } + if (contentType.startsWith(JSON)) { + return false; + } + // Fall back to protobuf for unrecognized types; the parser will fail cleanly if wrong. + return true; + } + + /** Parses the request body into {@code builder}. Caller provides a fresh builder. */ + static T parse( + final byte[] body, final T builder, final boolean protobuf) throws Exception { + if (body == null || body.length == 0) { + return builder; + } + if (protobuf) { + builder.mergeFrom(body); + } else { + JSON_PARSER.merge(new String(body, StandardCharsets.UTF_8), builder); + } + return builder; + } + + /** Builds an OTLP success response (empty message) in the client's preferred encoding. */ + static Response success(final Message emptyResponse, final boolean protobuf) { + if (protobuf) { + return Response.ok(emptyResponse.toByteArray(), PROTOBUF).build(); + } + // JSON for the empty response is "{}". + return Response.ok("{}", JSON).build(); + } + + /** Builds an OTLP partial-success response with a message describing what failed. */ + static Response partialFailure(final boolean protobuf, final String message) { + LOGGER.warn("OTLP request partially failed: {}", message); + if (protobuf) { + // Empty protobuf; the partial_success field would go here but keeping it minimal is fine. + return Response.status(Response.Status.INTERNAL_SERVER_ERROR) + .type(PROTOBUF) + .entity(new byte[0]) + .build(); + } + return Response.status(Response.Status.INTERNAL_SERVER_ERROR) + .type(JSON) + .entity("{\"message\":\"" + escapeJson(message) + "\"}") + .build(); + } + + /** Builds a client error response, e.g. on malformed input. */ + static Response badRequest(final boolean protobuf, final String message) { + LOGGER.debug("OTLP bad request: {}", message); + if (protobuf) { + return Response.status(Response.Status.BAD_REQUEST) + .type(PROTOBUF) + .entity(new byte[0]) + .build(); + } + return Response.status(Response.Status.BAD_REQUEST) + .type(JSON) + .entity("{\"message\":\"" + escapeJson(message) + "\"}") + .build(); + } + + /** Returns {@link MediaType#APPLICATION_OCTET_STREAM} compatible default if nothing matches. */ + static String responseContentType(final boolean protobuf) { + return protobuf ? PROTOBUF : JSON; + } + + private static String escapeJson(final String s) { + if (s == null) { + return ""; + } + return s.replace("\\", "\\\\").replace("\"", "\\\""); + } +} diff --git a/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpIngestor.java b/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpIngestor.java new file mode 100644 index 0000000000000..46d2b952d423a --- /dev/null +++ b/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpIngestor.java @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.rest.protocol.otlp.v1; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.protocol.session.IClientSession; +import org.apache.iotdb.db.protocol.session.SessionManager; +import org.apache.iotdb.db.queryengine.plan.Coordinator; +import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult; +import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; +import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.BitMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.StandardCharsets; + +/** + * Packs a column-oriented batch of OTLP rows into an {@link InsertTabletStatement} and hands it off + * to the {@link Coordinator} for the table model. The receiver builds rows via {@link + * OtlpTableBatch}; this class only cares about serializing those rows in the layout IoTDB expects. + */ +final class OtlpIngestor { + + private static final Logger LOGGER = LoggerFactory.getLogger(OtlpIngestor.class); + private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); + private static final Binary EMPTY_BINARY = new Binary("".getBytes(StandardCharsets.UTF_8)); + + private OtlpIngestor() {} + + /** Inserts a batch. No-ops if batch is empty. Returns true on success. */ + static boolean insert( + final String database, final IClientSession session, final OtlpTableBatch batch) { + if (batch.rowCount() == 0) { + return true; + } + + final SessionManager sessionManager = SessionManager.getInstance(); + final long queryId = sessionManager.requestQueryId(); + try { + session.setDatabaseName(database); + session.setSqlDialect(IClientSession.SqlDialect.TABLE); + + final InsertTabletStatement statement = buildStatement(batch); + final Metadata metadata = LocalExecutionPlanner.getInstance().metadata; + + // The legacy 8-arg overload takes the tree-model Statement subtype (InsertTabletStatement). + // The newer overloads require a relational.sql.ast.Statement, which does not apply here. + // This is the same overload the REST /insertTablet and MQTT handlers use. + @SuppressWarnings("deprecation") + final ExecutionResult result = + Coordinator.getInstance() + .executeForTableModel( + statement, + new SqlParser(), + session, + queryId, + sessionManager.getSessionInfo(session), + "", + metadata, + CONFIG.getQueryTimeoutThreshold()); + final TSStatus status = result.status; + final int code = status.getCode(); + if (code != TSStatusCode.SUCCESS_STATUS.getStatusCode() + && code != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { + LOGGER.warn( + "OTLP insert into {}.{} failed: code={}, message={}, rows={}", + database, + batch.tableName(), + code, + status.getMessage(), + batch.rowCount()); + return false; + } + return true; + } catch (final Exception e) { + LOGGER.warn("OTLP insert into {}.{} threw", database, batch.tableName(), e); + return false; + } finally { + Coordinator.getInstance().cleanupQueryExecution(queryId); + } + } + + private static InsertTabletStatement buildStatement(final OtlpTableBatch batch) { + final InsertTabletStatement statement = new InsertTabletStatement(); + statement.setDevicePath(new PartialPath(batch.tableName(), false)); + statement.setMeasurements(batch.columnNames()); + statement.setTimes(batch.times()); + statement.setDataTypes(batch.dataTypes()); + statement.setColumnCategories(batch.columnCategories()); + statement.setColumns(batch.columnValues()); + statement.setBitMaps(batch.bitMaps()); + statement.setRowCount(batch.rowCount()); + statement.setAligned(false); + statement.setWriteToTable(true); + return statement; + } + + /** + * Column-major row buffer with a fixed schema. Callers push rows with {@link #startRow(long)} + * followed by {@code set*} calls, then hand the batch to {@link OtlpIngestor#insert}. Unset cells + * become IoTDB nulls by way of the BitMap: the default IoTDB convention is that a fresh BitMap + * has every position cleared (= not-null), so we pre-mark every slot and let the per-column + * {@code set*} methods do nothing on null input — this way any slot that ends up untouched stays + * marked. + */ + static final class OtlpTableBatch { + private final String tableName; + private final String[] columnNames; + private final TSDataType[] dataTypes; + private final TsTableColumnCategory[] columnCategories; + private final int capacity; + + private final long[] times; + private final Object[] columnValues; + private final BitMap[] bitMaps; + private int rowCount; + + OtlpTableBatch( + final String tableName, + final String[] columnNames, + final TSDataType[] dataTypes, + final TsTableColumnCategory[] columnCategories, + final int capacity) { + this.tableName = tableName; + this.columnNames = columnNames; + this.dataTypes = dataTypes; + this.columnCategories = columnCategories; + this.capacity = Math.max(capacity, 1); + this.times = new long[this.capacity]; + this.columnValues = new Object[columnNames.length]; + this.bitMaps = new BitMap[columnNames.length]; + for (int c = 0; c < columnNames.length; c++) { + this.columnValues[c] = allocateColumn(dataTypes[c], this.capacity); + this.bitMaps[c] = null; + } + } + + /** Begins a new row at {@code time}. Must be followed by set* calls for every non-null cell. */ + void startRow(final long time) { + if (rowCount >= capacity) { + throw new IllegalStateException( + "OtlpTableBatch overflow: row " + rowCount + " >= capacity " + capacity); + } + times[rowCount] = time; + rowCount++; + } + + void setString(final int column, final String value) { + if (value == null) { + return; + } + final Binary[] arr = (Binary[]) columnValues[column]; + arr[rowCount - 1] = + value.isEmpty() ? EMPTY_BINARY : new Binary(value.getBytes(StandardCharsets.UTF_8)); + } + + void setLong(final int column, final long value) { + final long[] arr = (long[]) columnValues[column]; + arr[rowCount - 1] = value; + } + + void setInt(final int column, final int value) { + final int[] arr = (int[]) columnValues[column]; + arr[rowCount - 1] = value; + } + + void setDouble(final int column, final double value) { + final double[] arr = (double[]) columnValues[column]; + arr[rowCount - 1] = value; + } + + String tableName() { + return tableName; + } + + String[] columnNames() { + return columnNames; + } + + TSDataType[] dataTypes() { + return dataTypes; + } + + TsTableColumnCategory[] columnCategories() { + return columnCategories; + } + + int rowCount() { + return rowCount; + } + + long[] times() { + if (rowCount == capacity) { + return times; + } + final long[] trimmed = new long[rowCount]; + System.arraycopy(times, 0, trimmed, 0, rowCount); + return trimmed; + } + + Object[] columnValues() { + // For string columns, replace any unassigned (null) slot with EMPTY_BINARY so the writer + // has a real Binary object to work with; the bit stays cleared so the cell still reads as + // not-null. If you need true nullability, mark the bit explicitly before calling this. + for (int c = 0; c < columnNames.length; c++) { + if (columnValues[c] instanceof Binary[]) { + final Binary[] arr = (Binary[]) columnValues[c]; + for (int r = 0; r < rowCount; r++) { + if (arr[r] == null) { + arr[r] = EMPTY_BINARY; + } + } + } + } + if (rowCount == capacity) { + return columnValues; + } + final Object[] trimmed = new Object[columnNames.length]; + for (int c = 0; c < columnNames.length; c++) { + trimmed[c] = trimColumn(columnValues[c], dataTypes[c], rowCount); + } + return trimmed; + } + + BitMap[] bitMaps() { + // Fresh BitMaps: every bit zero = every cell not-null. OTLP rows always provide scalar + // defaults (empty strings, 0 ints) rather than true nulls, so we never mark anything. + final BitMap[] out = new BitMap[columnNames.length]; + for (int c = 0; c < columnNames.length; c++) { + out[c] = new BitMap(rowCount); + } + return out; + } + + private static Object allocateColumn(final TSDataType type, final int capacity) { + switch (type) { + case INT32: + return new int[capacity]; + case INT64: + case TIMESTAMP: + return new long[capacity]; + case FLOAT: + return new float[capacity]; + case DOUBLE: + return new double[capacity]; + case BOOLEAN: + return new boolean[capacity]; + case TEXT: + case STRING: + case BLOB: + return new Binary[capacity]; + default: + throw new UnsupportedOperationException("OTLP: unsupported TSDataType " + type); + } + } + + private static Object trimColumn(final Object source, final TSDataType type, final int length) { + switch (type) { + case INT32: + { + final int[] trimmed = new int[length]; + System.arraycopy(source, 0, trimmed, 0, length); + return trimmed; + } + case INT64: + case TIMESTAMP: + { + final long[] trimmed = new long[length]; + System.arraycopy(source, 0, trimmed, 0, length); + return trimmed; + } + case FLOAT: + { + final float[] trimmed = new float[length]; + System.arraycopy(source, 0, trimmed, 0, length); + return trimmed; + } + case DOUBLE: + { + final double[] trimmed = new double[length]; + System.arraycopy(source, 0, trimmed, 0, length); + return trimmed; + } + case BOOLEAN: + { + final boolean[] trimmed = new boolean[length]; + System.arraycopy(source, 0, trimmed, 0, length); + return trimmed; + } + case TEXT: + case STRING: + case BLOB: + { + final Binary[] trimmed = new Binary[length]; + System.arraycopy(source, 0, trimmed, 0, length); + return trimmed; + } + default: + throw new UnsupportedOperationException("OTLP: unsupported TSDataType " + type); + } + } + } +} diff --git a/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpLogsConverter.java b/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpLogsConverter.java new file mode 100644 index 0000000000000..ff507146a6c06 --- /dev/null +++ b/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpLogsConverter.java @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.rest.protocol.otlp.v1; + +import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; +import org.apache.iotdb.db.protocol.session.IClientSession; +import org.apache.iotdb.rest.protocol.otlp.v1.OtlpIngestor.OtlpTableBatch; + +import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest; +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.logs.v1.LogRecord; +import io.opentelemetry.proto.logs.v1.ResourceLogs; +import io.opentelemetry.proto.logs.v1.ScopeLogs; +import org.apache.tsfile.enums.TSDataType; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +final class OtlpLogsConverter { + + private static final String TABLE = "logs"; + + // TAG(3) + ATTRIBUTE(4) + FIELD(18) = 25 columns + private static final String[] COLUMN_NAMES = { + "user_id", + "session_id", + "event_name", + "terminal_type", + "service_version", + "os_type", + "host_arch", + "prompt_id", + "event_sequence", + "body", + "prompt_length", + "prompt", + "model", + "cost_usd", + "duration_ms", + "input_tokens", + "output_tokens", + "cache_read_tokens", + "cache_creation_tokens", + "request_id", + "speed", + "error", + "status_code", + "attempt", + "tool_name", + "success", + "tool_duration_ms", + "decision", + "decision_source", + "tool_result_size_bytes" + }; + private static final TSDataType[] DATA_TYPES = { + TSDataType.STRING, + TSDataType.STRING, + TSDataType.STRING, + TSDataType.STRING, + TSDataType.STRING, + TSDataType.STRING, + TSDataType.STRING, + TSDataType.STRING, + TSDataType.INT32, + TSDataType.STRING, + TSDataType.INT32, + TSDataType.STRING, + TSDataType.STRING, + TSDataType.DOUBLE, + TSDataType.INT64, + TSDataType.INT64, + TSDataType.INT64, + TSDataType.INT64, + TSDataType.INT64, + TSDataType.STRING, + TSDataType.STRING, + TSDataType.STRING, + TSDataType.STRING, + TSDataType.INT32, + TSDataType.STRING, + TSDataType.STRING, + TSDataType.INT64, + TSDataType.STRING, + TSDataType.STRING, + TSDataType.INT64 + }; + private static final TsTableColumnCategory[] CATEGORIES = { + TsTableColumnCategory.TAG, + TsTableColumnCategory.TAG, + TsTableColumnCategory.TAG, + TsTableColumnCategory.ATTRIBUTE, + TsTableColumnCategory.ATTRIBUTE, + TsTableColumnCategory.ATTRIBUTE, + TsTableColumnCategory.ATTRIBUTE, + TsTableColumnCategory.FIELD, + TsTableColumnCategory.FIELD, + TsTableColumnCategory.FIELD, + TsTableColumnCategory.FIELD, + TsTableColumnCategory.FIELD, + TsTableColumnCategory.FIELD, + TsTableColumnCategory.FIELD, + TsTableColumnCategory.FIELD, + TsTableColumnCategory.FIELD, + TsTableColumnCategory.FIELD, + TsTableColumnCategory.FIELD, + TsTableColumnCategory.FIELD, + TsTableColumnCategory.FIELD, + TsTableColumnCategory.FIELD, + TsTableColumnCategory.FIELD, + TsTableColumnCategory.FIELD, + TsTableColumnCategory.FIELD, + TsTableColumnCategory.FIELD, + TsTableColumnCategory.FIELD, + TsTableColumnCategory.FIELD, + TsTableColumnCategory.FIELD, + TsTableColumnCategory.FIELD, + TsTableColumnCategory.FIELD + }; + + private static final int C_USER_ID = 0; + private static final int C_SESSION_ID = 1; + private static final int C_EVENT_NAME = 2; + private static final int C_TERMINAL_TYPE = 3; + private static final int C_SERVICE_VERSION = 4; + private static final int C_OS_TYPE = 5; + private static final int C_HOST_ARCH = 6; + private static final int C_PROMPT_ID = 7; + private static final int C_EVENT_SEQUENCE = 8; + private static final int C_BODY = 9; + private static final int C_PROMPT_LENGTH = 10; + private static final int C_PROMPT = 11; + private static final int C_MODEL = 12; + private static final int C_COST_USD = 13; + private static final int C_DURATION_MS = 14; + private static final int C_INPUT_TOKENS = 15; + private static final int C_OUTPUT_TOKENS = 16; + private static final int C_CACHE_READ_TOKENS = 17; + private static final int C_CACHE_CREATION_TOKENS = 18; + private static final int C_REQUEST_ID = 19; + private static final int C_SPEED = 20; + private static final int C_ERROR = 21; + private static final int C_STATUS_CODE = 22; + private static final int C_ATTEMPT = 23; + private static final int C_TOOL_NAME = 24; + private static final int C_SUCCESS = 25; + private static final int C_TOOL_DURATION_MS = 26; + private static final int C_DECISION = 27; + private static final int C_DECISION_SOURCE = 28; + private static final int C_TOOL_RESULT_SIZE_BYTES = 29; + + private OtlpLogsConverter() {} + + static boolean convertAndInsert( + final OtlpService service, final ExportLogsServiceRequest request) { + final Map> byDb = new HashMap<>(); + for (final ResourceLogs rl : request.getResourceLogsList()) { + final String db = + OtlpConverter.deriveDatabaseName( + OtlpConverter.extractServiceName(rl.getResource().getAttributesList())); + byDb.computeIfAbsent(db, k -> new ArrayList<>()).add(rl); + } + boolean allOk = true; + for (final Map.Entry> entry : byDb.entrySet()) { + if (!insertForDatabase(service, entry.getKey(), entry.getValue())) { + allOk = false; + } + } + return allOk; + } + + private static boolean insertForDatabase( + final OtlpService service, final String db, final List rls) { + int capacity = 0; + for (final ResourceLogs rl : rls) { + for (final ScopeLogs sl : rl.getScopeLogsList()) { + capacity += sl.getLogRecordsCount(); + } + } + if (capacity == 0) { + return true; + } + final IClientSession session = service.sessionFor(db); + final OtlpTableBatch batch = + new OtlpTableBatch(TABLE, COLUMN_NAMES, DATA_TYPES, CATEGORIES, capacity); + + for (final ResourceLogs rl : rls) { + final List resAttrs = rl.getResource().getAttributesList(); + final String serviceVersion = OtlpConverter.extractAttribute(resAttrs, "service.version"); + final String osType = OtlpConverter.extractAttribute(resAttrs, "os.type"); + final String hostArch = OtlpConverter.extractAttribute(resAttrs, "host.arch"); + for (final ScopeLogs sl : rl.getScopeLogsList()) { + for (final LogRecord log : sl.getLogRecordsList()) { + final long ts = + log.getTimeUnixNano() != 0 ? log.getTimeUnixNano() : log.getObservedTimeUnixNano(); + batch.startRow(OtlpConverter.nanoToDbPrecision(ts)); + + final List attrs = log.getAttributesList(); + final String eventName = OtlpConverter.extractAttribute(attrs, "event.name"); + + // TAGs + batch.setString(C_USER_ID, OtlpConverter.extractAttribute(attrs, "user.id")); + batch.setString(C_SESSION_ID, OtlpConverter.extractAttribute(attrs, "session.id")); + batch.setString(C_EVENT_NAME, eventName); + // ATTRIBUTEs + batch.setString(C_TERMINAL_TYPE, OtlpConverter.extractAttribute(attrs, "terminal.type")); + batch.setString(C_SERVICE_VERSION, serviceVersion); + batch.setString(C_OS_TYPE, osType); + batch.setString(C_HOST_ARCH, hostArch); + // Common FIELDs + batch.setString(C_PROMPT_ID, OtlpConverter.extractAttribute(attrs, "prompt.id")); + setIntFromAttr(batch, C_EVENT_SEQUENCE, attrs, "event.sequence"); + batch.setString(C_BODY, bodyToString(log.getBody())); + + // Event-specific FIELDs + if ("user_prompt".equals(eventName)) { + setIntFromAttr(batch, C_PROMPT_LENGTH, attrs, "prompt_length"); + batch.setString(C_PROMPT, OtlpConverter.extractAttribute(attrs, "prompt")); + } else if ("api_request".equals(eventName)) { + batch.setString(C_MODEL, OtlpConverter.extractAttribute(attrs, "model")); + setDoubleFromAttr(batch, C_COST_USD, attrs, "cost_usd"); + setLongFromAttr(batch, C_DURATION_MS, attrs, "duration_ms"); + setLongFromAttr(batch, C_INPUT_TOKENS, attrs, "input_tokens"); + setLongFromAttr(batch, C_OUTPUT_TOKENS, attrs, "output_tokens"); + setLongFromAttr(batch, C_CACHE_READ_TOKENS, attrs, "cache_read_tokens"); + setLongFromAttr(batch, C_CACHE_CREATION_TOKENS, attrs, "cache_creation_tokens"); + batch.setString(C_REQUEST_ID, OtlpConverter.extractAttribute(attrs, "request_id")); + batch.setString(C_SPEED, OtlpConverter.extractAttribute(attrs, "speed")); + } else if ("api_error".equals(eventName)) { + batch.setString(C_MODEL, OtlpConverter.extractAttribute(attrs, "model")); + batch.setString(C_ERROR, OtlpConverter.extractAttribute(attrs, "error")); + batch.setString(C_STATUS_CODE, OtlpConverter.extractAttribute(attrs, "status_code")); + setLongFromAttr(batch, C_DURATION_MS, attrs, "duration_ms"); + setIntFromAttr(batch, C_ATTEMPT, attrs, "attempt"); + batch.setString(C_REQUEST_ID, OtlpConverter.extractAttribute(attrs, "request_id")); + batch.setString(C_SPEED, OtlpConverter.extractAttribute(attrs, "speed")); + } else if ("tool_result".equals(eventName)) { + batch.setString(C_TOOL_NAME, OtlpConverter.extractAttribute(attrs, "tool_name")); + batch.setString(C_SUCCESS, OtlpConverter.extractAttribute(attrs, "success")); + setLongFromAttr(batch, C_TOOL_DURATION_MS, attrs, "duration_ms"); + batch.setString( + C_DECISION_SOURCE, OtlpConverter.extractAttribute(attrs, "decision_source")); + batch.setString(C_DECISION, OtlpConverter.extractAttribute(attrs, "decision_type")); + setLongFromAttr(batch, C_TOOL_RESULT_SIZE_BYTES, attrs, "tool_result_size_bytes"); + } else if ("tool_decision".equals(eventName)) { + batch.setString(C_TOOL_NAME, OtlpConverter.extractAttribute(attrs, "tool_name")); + batch.setString(C_DECISION, OtlpConverter.extractAttribute(attrs, "decision")); + batch.setString(C_DECISION_SOURCE, OtlpConverter.extractAttribute(attrs, "source")); + } + } + } + } + return OtlpIngestor.insert(db, session, batch); + } + + private static void setLongFromAttr( + final OtlpTableBatch batch, final int col, final List attrs, final String key) { + final String v = OtlpConverter.extractAttribute(attrs, key); + if (!v.isEmpty()) { + try { + batch.setLong(col, Long.parseLong(v)); + } catch (final NumberFormatException ignored) { + } + } + } + + private static void setIntFromAttr( + final OtlpTableBatch batch, final int col, final List attrs, final String key) { + final String v = OtlpConverter.extractAttribute(attrs, key); + if (!v.isEmpty()) { + try { + batch.setInt(col, Integer.parseInt(v)); + } catch (final NumberFormatException ignored) { + } + } + } + + private static void setDoubleFromAttr( + final OtlpTableBatch batch, final int col, final List attrs, final String key) { + final String v = OtlpConverter.extractAttribute(attrs, key); + if (!v.isEmpty()) { + try { + batch.setDouble(col, Double.parseDouble(v)); + } catch (final NumberFormatException ignored) { + } + } + } + + private static String bodyToString(final AnyValue body) { + if (body == null) { + return ""; + } + switch (body.getValueCase()) { + case STRING_VALUE: + return body.getStringValue(); + case BOOL_VALUE: + return Boolean.toString(body.getBoolValue()); + case INT_VALUE: + return Long.toString(body.getIntValue()); + case DOUBLE_VALUE: + return Double.toString(body.getDoubleValue()); + case BYTES_VALUE: + return OtlpConverter.bytesToHex(body.getBytesValue()); + default: + return body.toString(); + } + } +} diff --git a/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpLogsResource.java b/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpLogsResource.java new file mode 100644 index 0000000000000..d3d81d0188280 --- /dev/null +++ b/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpLogsResource.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.rest.protocol.otlp.v1; + +import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest; +import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.Response; + +/** OTLP/HTTP logs endpoint, served at {@code /rest/otlp/v1/logs}. */ +@Path("/rest/v1/otlp/v1/logs") +public class OtlpLogsResource { + + private static final Logger LOGGER = LoggerFactory.getLogger(OtlpLogsResource.class); + + @POST + public Response export(@Context final HttpHeaders headers, final byte[] body) { + final boolean protobuf = OtlpHttp.isProtobuf(headers); + try { + final ExportLogsServiceRequest request = + OtlpHttp.parse(body, ExportLogsServiceRequest.newBuilder(), protobuf).build(); + final boolean ok = OtlpService.getInstance().ingestLogs(request); + if (!ok) { + return OtlpHttp.partialFailure(protobuf, "OTLP logs insert failed"); + } + return OtlpHttp.success(ExportLogsServiceResponse.getDefaultInstance(), protobuf); + } catch (final Exception e) { + LOGGER.warn("OTLP logs export failed", e); + return OtlpHttp.badRequest(protobuf, e.getMessage()); + } + } +} diff --git a/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpMetricsConverter.java b/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpMetricsConverter.java new file mode 100644 index 0000000000000..0e27b19070ad0 --- /dev/null +++ b/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpMetricsConverter.java @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.rest.protocol.otlp.v1; + +import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; +import org.apache.iotdb.db.protocol.session.IClientSession; +import org.apache.iotdb.rest.protocol.otlp.v1.OtlpIngestor.OtlpTableBatch; + +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.metrics.v1.HistogramDataPoint; +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.NumberDataPoint; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.metrics.v1.ScopeMetrics; +import io.opentelemetry.proto.metrics.v1.SummaryDataPoint; +import org.apache.tsfile.enums.TSDataType; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +final class OtlpMetricsConverter { + + private static final String TABLE = "metrics"; + + // TAG(5) + ATTRIBUTE(8) + FIELD(1) = 14 columns + private static final String[] COLUMN_NAMES = { + "user_id", + "session_id", + "metric_name", + "model", + "type", + "terminal_type", + "service_version", + "os_type", + "os_version", + "host_arch", + "unit", + "metric_type", + "description", + "value" + }; + private static final TSDataType[] DATA_TYPES = { + TSDataType.STRING, + TSDataType.STRING, + TSDataType.STRING, + TSDataType.STRING, + TSDataType.STRING, + TSDataType.STRING, + TSDataType.STRING, + TSDataType.STRING, + TSDataType.STRING, + TSDataType.STRING, + TSDataType.STRING, + TSDataType.STRING, + TSDataType.STRING, + TSDataType.DOUBLE + }; + private static final TsTableColumnCategory[] CATEGORIES = { + TsTableColumnCategory.TAG, + TsTableColumnCategory.TAG, + TsTableColumnCategory.TAG, + TsTableColumnCategory.TAG, + TsTableColumnCategory.TAG, + TsTableColumnCategory.ATTRIBUTE, + TsTableColumnCategory.ATTRIBUTE, + TsTableColumnCategory.ATTRIBUTE, + TsTableColumnCategory.ATTRIBUTE, + TsTableColumnCategory.ATTRIBUTE, + TsTableColumnCategory.ATTRIBUTE, + TsTableColumnCategory.ATTRIBUTE, + TsTableColumnCategory.ATTRIBUTE, + TsTableColumnCategory.FIELD + }; + + // Column indices + private static final int C_USER_ID = 0; + private static final int C_SESSION_ID = 1; + private static final int C_METRIC_NAME = 2; + private static final int C_MODEL = 3; + private static final int C_TYPE = 4; + private static final int C_TERMINAL_TYPE = 5; + private static final int C_SERVICE_VERSION = 6; + private static final int C_OS_TYPE = 7; + private static final int C_OS_VERSION = 8; + private static final int C_HOST_ARCH = 9; + private static final int C_UNIT = 10; + private static final int C_METRIC_TYPE = 11; + private static final int C_DESCRIPTION = 12; + private static final int C_VALUE = 13; + + private OtlpMetricsConverter() {} + + static boolean convertAndInsert( + final OtlpService service, final ExportMetricsServiceRequest request) { + final Map> byDb = new HashMap<>(); + for (final ResourceMetrics rm : request.getResourceMetricsList()) { + final String db = + OtlpConverter.deriveDatabaseName( + OtlpConverter.extractServiceName(rm.getResource().getAttributesList())); + byDb.computeIfAbsent(db, k -> new ArrayList<>()).add(rm); + } + boolean allOk = true; + for (final Map.Entry> entry : byDb.entrySet()) { + if (!insertForDatabase(service, entry.getKey(), entry.getValue())) { + allOk = false; + } + } + return allOk; + } + + private static boolean insertForDatabase( + final OtlpService service, final String db, final List rms) { + int capacity = 0; + for (final ResourceMetrics rm : rms) { + for (final ScopeMetrics sm : rm.getScopeMetricsList()) { + for (final Metric m : sm.getMetricsList()) { + capacity += countDataPoints(m); + } + } + } + if (capacity == 0) { + return true; + } + final IClientSession session = service.sessionFor(db); + final OtlpTableBatch batch = + new OtlpTableBatch(TABLE, COLUMN_NAMES, DATA_TYPES, CATEGORIES, capacity); + + for (final ResourceMetrics rm : rms) { + final List resAttrs = rm.getResource().getAttributesList(); + final String serviceVersion = OtlpConverter.extractAttribute(resAttrs, "service.version"); + final String osType = OtlpConverter.extractAttribute(resAttrs, "os.type"); + final String osVersion = OtlpConverter.extractAttribute(resAttrs, "os.version"); + final String hostArch = OtlpConverter.extractAttribute(resAttrs, "host.arch"); + for (final ScopeMetrics sm : rm.getScopeMetricsList()) { + for (final Metric metric : sm.getMetricsList()) { + writeMetric(batch, metric, serviceVersion, osType, osVersion, hostArch); + } + } + } + return OtlpIngestor.insert(db, session, batch); + } + + private static void writeMetric( + final OtlpTableBatch batch, + final Metric metric, + final String serviceVersion, + final String osType, + final String osVersion, + final String hostArch) { + switch (metric.getDataCase()) { + case GAUGE: + for (final NumberDataPoint dp : metric.getGauge().getDataPointsList()) { + writeNumberPoint(batch, metric, "gauge", dp, serviceVersion, osType, osVersion, hostArch); + } + break; + case SUM: + for (final NumberDataPoint dp : metric.getSum().getDataPointsList()) { + writeNumberPoint(batch, metric, "sum", dp, serviceVersion, osType, osVersion, hostArch); + } + break; + case HISTOGRAM: + for (final HistogramDataPoint dp : metric.getHistogram().getDataPointsList()) { + writeRow( + batch, + dp.getTimeUnixNano(), + dp.getAttributesList(), + metric.getName(), + "histogram", + dp.hasSum() ? dp.getSum() : 0.0, + metric.getUnit(), + metric.getDescription(), + serviceVersion, + osType, + osVersion, + hostArch); + } + break; + case EXPONENTIAL_HISTOGRAM: + for (final io.opentelemetry.proto.metrics.v1.ExponentialHistogramDataPoint dp : + metric.getExponentialHistogram().getDataPointsList()) { + writeRow( + batch, + dp.getTimeUnixNano(), + dp.getAttributesList(), + metric.getName(), + "exponential_histogram", + dp.hasSum() ? dp.getSum() : 0.0, + metric.getUnit(), + metric.getDescription(), + serviceVersion, + osType, + osVersion, + hostArch); + } + break; + case SUMMARY: + for (final SummaryDataPoint dp : metric.getSummary().getDataPointsList()) { + writeRow( + batch, + dp.getTimeUnixNano(), + dp.getAttributesList(), + metric.getName(), + "summary", + dp.getSum(), + metric.getUnit(), + metric.getDescription(), + serviceVersion, + osType, + osVersion, + hostArch); + } + break; + default: + break; + } + } + + private static void writeNumberPoint( + final OtlpTableBatch batch, + final Metric metric, + final String metricType, + final NumberDataPoint dp, + final String serviceVersion, + final String osType, + final String osVersion, + final String hostArch) { + writeRow( + batch, + dp.getTimeUnixNano(), + dp.getAttributesList(), + metric.getName(), + metricType, + numericValue(dp), + metric.getUnit(), + metric.getDescription(), + serviceVersion, + osType, + osVersion, + hostArch); + } + + private static void writeRow( + final OtlpTableBatch batch, + final long timeUnixNano, + final List dpAttrs, + final String metricName, + final String metricType, + final double value, + final String unit, + final String description, + final String serviceVersion, + final String osType, + final String osVersion, + final String hostArch) { + batch.startRow(OtlpConverter.nanoToDbPrecision(timeUnixNano)); + // TAGs + batch.setString(C_USER_ID, OtlpConverter.extractAttribute(dpAttrs, "user.id")); + batch.setString(C_SESSION_ID, OtlpConverter.extractAttribute(dpAttrs, "session.id")); + batch.setString(C_METRIC_NAME, metricName); + batch.setString(C_MODEL, OtlpConverter.extractAttribute(dpAttrs, "model")); + batch.setString(C_TYPE, OtlpConverter.extractAttribute(dpAttrs, "type")); + // ATTRIBUTEs + batch.setString(C_TERMINAL_TYPE, OtlpConverter.extractAttribute(dpAttrs, "terminal.type")); + batch.setString(C_SERVICE_VERSION, serviceVersion); + batch.setString(C_OS_TYPE, osType); + batch.setString(C_OS_VERSION, osVersion); + batch.setString(C_HOST_ARCH, hostArch); + batch.setString(C_UNIT, unit); + batch.setString(C_METRIC_TYPE, metricType); + batch.setString(C_DESCRIPTION, description); + // FIELD + batch.setDouble(C_VALUE, value); + } + + private static double numericValue(final NumberDataPoint dp) { + switch (dp.getValueCase()) { + case AS_DOUBLE: + return dp.getAsDouble(); + case AS_INT: + return (double) dp.getAsInt(); + default: + return 0.0; + } + } + + private static int countDataPoints(final Metric metric) { + switch (metric.getDataCase()) { + case GAUGE: + return metric.getGauge().getDataPointsCount(); + case SUM: + return metric.getSum().getDataPointsCount(); + case HISTOGRAM: + return metric.getHistogram().getDataPointsCount(); + case EXPONENTIAL_HISTOGRAM: + return metric.getExponentialHistogram().getDataPointsCount(); + case SUMMARY: + return metric.getSummary().getDataPointsCount(); + default: + return 0; + } + } +} diff --git a/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpMetricsResource.java b/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpMetricsResource.java new file mode 100644 index 0000000000000..50945ac2cf5d7 --- /dev/null +++ b/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpMetricsResource.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.rest.protocol.otlp.v1; + +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.Response; + +/** OTLP/HTTP metrics endpoint, served at {@code /rest/otlp/v1/metrics}. */ +@Path("/rest/v1/otlp/v1/metrics") +public class OtlpMetricsResource { + + private static final Logger LOGGER = LoggerFactory.getLogger(OtlpMetricsResource.class); + + @POST + public Response export(@Context final HttpHeaders headers, final byte[] body) { + final boolean protobuf = OtlpHttp.isProtobuf(headers); + try { + final ExportMetricsServiceRequest request = + OtlpHttp.parse(body, ExportMetricsServiceRequest.newBuilder(), protobuf).build(); + final boolean ok = OtlpService.getInstance().ingestMetrics(request); + if (!ok) { + return OtlpHttp.partialFailure(protobuf, "OTLP metrics insert failed"); + } + return OtlpHttp.success(ExportMetricsServiceResponse.getDefaultInstance(), protobuf); + } catch (final Exception e) { + LOGGER.warn("OTLP metrics export failed", e); + return OtlpHttp.badRequest(protobuf, e.getMessage()); + } + } +} diff --git a/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpSchemaInitializer.java b/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpSchemaInitializer.java new file mode 100644 index 0000000000000..05b32d924b085 --- /dev/null +++ b/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpSchemaInitializer.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.rest.protocol.otlp.v1; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.protocol.session.IClientSession; +import org.apache.iotdb.db.protocol.session.SessionManager; +import org.apache.iotdb.db.queryengine.plan.Coordinator; +import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult; +import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement; +import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Ensures the {@code claude_code} database and the {@code traces}, {@code metrics}, {@code logs} + * tables exist before the OTLP receiver writes its first batch. Idempotent via {@code CREATE ... IF + * NOT EXISTS}, so running it once per ingest call is cheap after the initial DDL has been + * replicated across the cluster. + */ +final class OtlpSchemaInitializer { + + private static final Logger LOGGER = LoggerFactory.getLogger(OtlpSchemaInitializer.class); + + private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); + + private OtlpSchemaInitializer() {} + + static void initialize(final String database, final IClientSession session) { + runDdl(session, null, "CREATE DATABASE IF NOT EXISTS " + database); + + runDdl( + session, + database, + "CREATE TABLE IF NOT EXISTS metrics (" + + "user_id STRING TAG, " + + "session_id STRING TAG, " + + "metric_name STRING TAG, " + + "model STRING TAG, " + + "type STRING TAG, " + + "terminal_type STRING ATTRIBUTE, " + + "service_version STRING ATTRIBUTE, " + + "os_type STRING ATTRIBUTE, " + + "os_version STRING ATTRIBUTE, " + + "host_arch STRING ATTRIBUTE, " + + "unit STRING ATTRIBUTE, " + + "metric_type STRING ATTRIBUTE, " + + "description STRING ATTRIBUTE, " + + "value DOUBLE FIELD)"); + + runDdl( + session, + database, + "CREATE TABLE IF NOT EXISTS logs (" + + "user_id STRING TAG, " + + "session_id STRING TAG, " + + "event_name STRING TAG, " + + "terminal_type STRING ATTRIBUTE, " + + "service_version STRING ATTRIBUTE, " + + "os_type STRING ATTRIBUTE, " + + "host_arch STRING ATTRIBUTE, " + + "prompt_id STRING FIELD, " + + "event_sequence INT32 FIELD, " + + "body STRING FIELD, " + + "prompt_length INT32 FIELD, " + + "prompt STRING FIELD, " + + "model STRING FIELD, " + + "cost_usd DOUBLE FIELD, " + + "duration_ms INT64 FIELD, " + + "input_tokens INT64 FIELD, " + + "output_tokens INT64 FIELD, " + + "cache_read_tokens INT64 FIELD, " + + "cache_creation_tokens INT64 FIELD, " + + "request_id STRING FIELD, " + + "speed STRING FIELD, " + + "error STRING FIELD, " + + "status_code STRING FIELD, " + + "attempt INT32 FIELD, " + + "tool_name STRING FIELD, " + + "success STRING FIELD, " + + "tool_duration_ms INT64 FIELD, " + + "decision STRING FIELD, " + + "decision_source STRING FIELD, " + + "tool_result_size_bytes INT64 FIELD)"); + + runDdl( + session, + database, + "CREATE TABLE IF NOT EXISTS traces (" + + "service_name STRING TAG, " + + "span_name STRING TAG, " + + "service_version STRING ATTRIBUTE, " + + "os_type STRING ATTRIBUTE, " + + "host_arch STRING ATTRIBUTE, " + + "trace_id STRING FIELD, " + + "span_id STRING FIELD, " + + "parent_span_id STRING FIELD, " + + "span_kind STRING FIELD, " + + "start_time_unix_nano INT64 FIELD, " + + "end_time_unix_nano INT64 FIELD, " + + "duration_nano INT64 FIELD, " + + "status_code STRING FIELD, " + + "status_message STRING FIELD, " + + "attributes STRING FIELD, " + + "resource_attributes STRING FIELD, " + + "scope_name STRING FIELD, " + + "scope_version STRING FIELD)"); + } + + private static void runDdl( + final IClientSession session, final String database, final String sql) { + final SessionManager sessionManager = SessionManager.getInstance(); + final Long queryId = sessionManager.requestQueryId(); + try { + if (database == null) { + // CREATE DATABASE must not set a current database; clear it so the parser parses the name + // as-is rather than prepending the current database. + session.setDatabaseName(null); + } else { + session.setDatabaseName(database); + } + session.setSqlDialect(IClientSession.SqlDialect.TABLE); + + final SqlParser parser = new SqlParser(); + final Statement statement = parser.createStatement(sql, session.getZoneId(), session); + final Metadata metadata = LocalExecutionPlanner.getInstance().metadata; + + final ExecutionResult result = + Coordinator.getInstance() + .executeForTableModel( + statement, + parser, + session, + queryId, + sessionManager.getSessionInfo(session), + sql, + metadata, + CONFIG.getQueryTimeoutThreshold(), + false, + false); + final TSStatus status = result.status; + final int code = status.getCode(); + if (code != TSStatusCode.SUCCESS_STATUS.getStatusCode() + && code != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode() + // 602 = DATABASE_ALREADY_EXISTS, 507 = TABLE_ALREADY_EXISTS in IF NOT EXISTS races + && code != TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode() + && code != TSStatusCode.TABLE_ALREADY_EXISTS.getStatusCode()) { + LOGGER.warn( + "OTLP schema init DDL failed: sql=[{}], code={}, message={}", + sql, + code, + status.getMessage()); + } + } catch (final Exception e) { + LOGGER.warn("OTLP schema init DDL threw: sql=[{}]", sql, e); + } finally { + Coordinator.getInstance().cleanupQueryExecution(queryId); + } + } +} diff --git a/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpService.java b/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpService.java new file mode 100644 index 0000000000000..bc40e5edc7611 --- /dev/null +++ b/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpService.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.rest.protocol.otlp.v1; + +import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.db.conf.rest.IoTDBRestServiceConfig; +import org.apache.iotdb.db.conf.rest.IoTDBRestServiceDescriptor; +import org.apache.iotdb.db.protocol.basic.BasicOpenSessionResp; +import org.apache.iotdb.db.protocol.session.IClientSession; +import org.apache.iotdb.db.protocol.session.RestClientSession; +import org.apache.iotdb.db.protocol.session.SessionManager; +import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion; + +import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest; +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; +import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.ZoneId; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Singleton glue between the HTTP OTLP resources and the IoTDB coordinator. + * + *

There is no single "OTLP database": the receiver routes each OTLP resource group to a database + * derived from its {@code service.name} resource attribute (see {@link + * OtlpConverter#deriveDatabaseName}), so traffic from {@code claude-code}, {@code codex}, {@code + * gemini} etc. lands in separate databases automatically. + * + *

Each database gets its own pinned {@link IClientSession} cached in {@link #sessionByDatabase}, + * created lazily on first use along with its {@code traces / metrics / logs} tables. Pinning the + * database per session avoids races between concurrent requests that would otherwise share one + * session's {@code databaseName} field. + */ +public final class OtlpService { + + private static final Logger LOGGER = LoggerFactory.getLogger(OtlpService.class); + + private static final OtlpService INSTANCE = new OtlpService(); + + private final IoTDBRestServiceConfig config = + IoTDBRestServiceDescriptor.getInstance().getConfig(); + + private final ConcurrentHashMap sessionByDatabase = + new ConcurrentHashMap<>(); + + private OtlpService() {} + + public static OtlpService getInstance() { + return INSTANCE; + } + + /** + * Returns a session pinned to {@code database}, creating it and its OTLP tables on first call. + * Safe to invoke from any request thread; the expensive login + DDL run at most once per database + * for the lifetime of the process. + */ + IClientSession sessionFor(final String database) { + final IClientSession existing = sessionByDatabase.get(database); + if (existing != null) { + return existing; + } + return sessionByDatabase.computeIfAbsent(database, this::openDatabaseSession); + } + + private IClientSession openDatabaseSession(final String database) { + final IClientSession session = login(); + if (session == null) { + throw new IllegalStateException( + "OTLP receiver failed to log in as user '" + + config.getOtlpUsername() + + "'. Check otlp_username / otlp_password in iotdb-system.properties."); + } + OtlpSchemaInitializer.initialize(database, session); + // After schema init the session's databaseName is pinned to `database`; subsequent inserts + // on this session all target it. + session.setDatabaseName(database); + session.setSqlDialect(IClientSession.SqlDialect.TABLE); + LOGGER.info( + "OTLP receiver ready for database={} (user={})", database, config.getOtlpUsername()); + return session; + } + + private IClientSession login() { + final SessionManager sm = SessionManager.getInstance(); + // Use 127.0.0.1 as the client address so LoginLockManager's localhost check succeeds without + // triggering a DNS lookup on a synthetic UUID (which logs a spurious UnknownHostException). + final RestClientSession session = new RestClientSession("127.0.0.1"); + session.setUsername(config.getOtlpUsername()); + final BasicOpenSessionResp resp = + sm.login( + session, + config.getOtlpUsername(), + config.getOtlpPassword(), + ZoneId.systemDefault().toString(), + TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3, + IoTDBConstant.ClientVersion.V_1_0, + IClientSession.SqlDialect.TABLE); + if (resp.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + LOGGER.warn( + "OTLP login failed: user={}, code={}, message={}", + config.getOtlpUsername(), + resp.getCode(), + resp.getMessage()); + return null; + } + return session; + } + + /** Ingests an OTLP trace export request. Returns true on all-success. */ + public boolean ingestTraces(final ExportTraceServiceRequest request) { + return OtlpTracesConverter.convertAndInsert(this, request); + } + + /** Ingests an OTLP metrics export request. Returns true on all-success. */ + public boolean ingestMetrics(final ExportMetricsServiceRequest request) { + return OtlpMetricsConverter.convertAndInsert(this, request); + } + + /** Ingests an OTLP logs export request. Returns true on all-success. */ + public boolean ingestLogs(final ExportLogsServiceRequest request) { + return OtlpLogsConverter.convertAndInsert(this, request); + } +} diff --git a/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpTracesConverter.java b/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpTracesConverter.java new file mode 100644 index 0000000000000..0160ec6daba87 --- /dev/null +++ b/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpTracesConverter.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.rest.protocol.otlp.v1; + +import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; +import org.apache.iotdb.db.protocol.session.IClientSession; +import org.apache.iotdb.rest.protocol.otlp.v1.OtlpIngestor.OtlpTableBatch; + +import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; +import io.opentelemetry.proto.trace.v1.ResourceSpans; +import io.opentelemetry.proto.trace.v1.ScopeSpans; +import io.opentelemetry.proto.trace.v1.Span; +import org.apache.tsfile.enums.TSDataType; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** Flattens OTLP trace export requests into rows for each service's {@code traces} table. */ +final class OtlpTracesConverter { + + private static final String TABLE = "traces"; + + // TAG(2) + ATTRIBUTE(3) + FIELD(12) = 17 columns + private static final String[] COLUMN_NAMES = { + "service_name", + "span_name", + "service_version", + "os_type", + "host_arch", + "trace_id", + "span_id", + "parent_span_id", + "span_kind", + "start_time_unix_nano", + "end_time_unix_nano", + "duration_nano", + "status_code", + "status_message", + "attributes", + "resource_attributes", + "scope_name", + "scope_version" + }; + private static final TSDataType[] DATA_TYPES = { + TSDataType.STRING, + TSDataType.STRING, + TSDataType.STRING, + TSDataType.STRING, + TSDataType.STRING, + TSDataType.STRING, + TSDataType.STRING, + TSDataType.STRING, + TSDataType.STRING, + TSDataType.INT64, + TSDataType.INT64, + TSDataType.INT64, + TSDataType.STRING, + TSDataType.STRING, + TSDataType.STRING, + TSDataType.STRING, + TSDataType.STRING, + TSDataType.STRING + }; + private static final TsTableColumnCategory[] COLUMN_CATEGORIES = { + TsTableColumnCategory.TAG, + TsTableColumnCategory.TAG, + TsTableColumnCategory.ATTRIBUTE, + TsTableColumnCategory.ATTRIBUTE, + TsTableColumnCategory.ATTRIBUTE, + TsTableColumnCategory.FIELD, + TsTableColumnCategory.FIELD, + TsTableColumnCategory.FIELD, + TsTableColumnCategory.FIELD, + TsTableColumnCategory.FIELD, + TsTableColumnCategory.FIELD, + TsTableColumnCategory.FIELD, + TsTableColumnCategory.FIELD, + TsTableColumnCategory.FIELD, + TsTableColumnCategory.FIELD, + TsTableColumnCategory.FIELD, + TsTableColumnCategory.FIELD, + TsTableColumnCategory.FIELD + }; + + private OtlpTracesConverter() {} + + static boolean convertAndInsert( + final OtlpService service, final ExportTraceServiceRequest request) { + final Map> byDatabase = new HashMap<>(); + for (final ResourceSpans rs : request.getResourceSpansList()) { + final String db = + OtlpConverter.deriveDatabaseName( + OtlpConverter.extractServiceName(rs.getResource().getAttributesList())); + byDatabase.computeIfAbsent(db, k -> new ArrayList<>()).add(rs); + } + + boolean allOk = true; + for (final Map.Entry> entry : byDatabase.entrySet()) { + if (!insertForDatabase(service, entry.getKey(), entry.getValue())) { + allOk = false; + } + } + return allOk; + } + + private static boolean insertForDatabase( + final OtlpService service, final String database, final List resourceSpans) { + int capacity = 0; + for (final ResourceSpans rs : resourceSpans) { + for (final ScopeSpans ss : rs.getScopeSpansList()) { + capacity += ss.getSpansCount(); + } + } + if (capacity == 0) { + return true; + } + final IClientSession session = service.sessionFor(database); + final OtlpTableBatch batch = + new OtlpTableBatch(TABLE, COLUMN_NAMES, DATA_TYPES, COLUMN_CATEGORIES, capacity); + + for (final ResourceSpans rs : resourceSpans) { + final java.util.List resAttrs = + rs.getResource().getAttributesList(); + final String serviceName = OtlpConverter.extractServiceName(resAttrs); + final String serviceVersion = OtlpConverter.extractAttribute(resAttrs, "service.version"); + final String osType = OtlpConverter.extractAttribute(resAttrs, "os.type"); + final String hostArch = OtlpConverter.extractAttribute(resAttrs, "host.arch"); + final String resourceAttrsJson = OtlpConverter.attributesToJson(resAttrs); + for (final ScopeSpans ss : rs.getScopeSpansList()) { + final String scopeName = ss.getScope().getName(); + final String scopeVersion = ss.getScope().getVersion(); + for (final Span span : ss.getSpansList()) { + final long startNano = span.getStartTimeUnixNano(); + final long endNano = span.getEndTimeUnixNano(); + batch.startRow(OtlpConverter.nanoToDbPrecision(startNano)); + int c = 0; + // TAGs + batch.setString(c++, serviceName); + batch.setString(c++, span.getName()); + // ATTRIBUTEs + batch.setString(c++, serviceVersion); + batch.setString(c++, osType); + batch.setString(c++, hostArch); + // FIELDs + batch.setString(c++, OtlpConverter.bytesToHex(span.getTraceId())); + batch.setString(c++, OtlpConverter.bytesToHex(span.getSpanId())); + batch.setString(c++, OtlpConverter.bytesToHex(span.getParentSpanId())); + batch.setString(c++, span.getKind().name()); + batch.setLong(c++, startNano); + batch.setLong(c++, endNano); + batch.setLong(c++, endNano - startNano); + batch.setString(c++, statusCode(span)); + batch.setString(c++, span.getStatus().getMessage()); + batch.setString(c++, OtlpConverter.attributesToJson(span.getAttributesList())); + batch.setString(c++, resourceAttrsJson); + batch.setString(c++, scopeName); + batch.setString(c, scopeVersion); + } + } + } + return OtlpIngestor.insert(database, session, batch); + } + + private static String statusCode(final Span span) { + // Status is a top-level proto type in opentelemetry-proto, not nested in Span. + // OTLP status codes: STATUS_CODE_UNSET = 0, STATUS_CODE_OK = 1, STATUS_CODE_ERROR = 2. + if (!span.hasStatus()) { + return "UNSET"; + } + final io.opentelemetry.proto.trace.v1.Status status = span.getStatus(); + switch (status.getCode()) { + case STATUS_CODE_OK: + return "OK"; + case STATUS_CODE_ERROR: + return "ERROR"; + case STATUS_CODE_UNSET: + case UNRECOGNIZED: + default: + return "UNSET"; + } + } +} diff --git a/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpTracesResource.java b/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpTracesResource.java new file mode 100644 index 0000000000000..06e21051fe22a --- /dev/null +++ b/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpTracesResource.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.rest.protocol.otlp.v1; + +import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; +import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.Response; + +/** + * OTLP/HTTP traces endpoint. OpenTelemetry fixes the signal suffix to {@code /v1/traces}; IoTDB + * serves the base under {@code /rest/otlp} so the full URL is {@code /rest/otlp/v1/traces} and + * clients configure {@code OTEL_EXPORTER_OTLP_ENDPOINT=http://:18080/rest/otlp}. + */ +@Path("/rest/v1/otlp/v1/traces") +public class OtlpTracesResource { + + private static final Logger LOGGER = LoggerFactory.getLogger(OtlpTracesResource.class); + + @POST + public Response export(@Context final HttpHeaders headers, final byte[] body) { + final boolean protobuf = OtlpHttp.isProtobuf(headers); + try { + final ExportTraceServiceRequest request = + OtlpHttp.parse(body, ExportTraceServiceRequest.newBuilder(), protobuf).build(); + final boolean ok = OtlpService.getInstance().ingestTraces(request); + if (!ok) { + return OtlpHttp.partialFailure(protobuf, "OTLP trace insert failed"); + } + return OtlpHttp.success(ExportTraceServiceResponse.getDefaultInstance(), protobuf); + } catch (final Exception e) { + LOGGER.warn("OTLP trace export failed", e); + return OtlpHttp.badRequest(protobuf, e.getMessage()); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/rest/IoTDBRestServiceConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/rest/IoTDBRestServiceConfig.java index 64c0f65fe3034..c720c3815274c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/rest/IoTDBRestServiceConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/rest/IoTDBRestServiceConfig.java @@ -62,6 +62,12 @@ public class IoTDBRestServiceConfig { /** Is client authentication required. */ private boolean clientAuth = false; + /** IoTDB user the OTLP receiver logs in as when writing telemetry data. */ + private String otlpUsername = "root"; + + /** Password paired with {@link #otlpUsername}. */ + private String otlpPassword = "root"; + public boolean isClientAuth() { return clientAuth; } @@ -173,4 +179,20 @@ public int getRestQueryDefaultRowSizeLimit() { public void setRestQueryDefaultRowSizeLimit(int restQueryDefaultRowSizeLimit) { this.restQueryDefaultRowSizeLimit = restQueryDefaultRowSizeLimit; } + + public String getOtlpUsername() { + return otlpUsername; + } + + public void setOtlpUsername(String otlpUsername) { + this.otlpUsername = otlpUsername; + } + + public String getOtlpPassword() { + return otlpPassword; + } + + public void setOtlpPassword(String otlpPassword) { + this.otlpPassword = otlpPassword; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/rest/IoTDBRestServiceDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/rest/IoTDBRestServiceDescriptor.java index c4f9d131de958..3d9a602c690be 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/rest/IoTDBRestServiceDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/rest/IoTDBRestServiceDescriptor.java @@ -111,6 +111,8 @@ private void loadProps(TrimProperties trimProperties) { Integer.parseInt( trimProperties.getProperty( "idle_timeout_in_seconds", Integer.toString(conf.getIdleTimeoutInSeconds())))); + conf.setOtlpUsername(trimProperties.getProperty("otlp_username", conf.getOtlpUsername())); + conf.setOtlpPassword(trimProperties.getProperty("otlp_password", conf.getOtlpPassword())); } /** diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template index 378a6226cbffd..7a7f3314aa047 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template @@ -594,6 +594,33 @@ client_auth=false # Datatype: int idle_timeout_in_seconds=50000 +#################### +### OpenTelemetry (OTLP) Receiver configuration +#################### +# The OTLP receiver is exposed under the REST service on port rest_service_port at base +# path /rest/v1/otlp. It accepts OTLP/HTTP requests at: +# POST /rest/v1/otlp/v1/traces +# POST /rest/v1/otlp/v1/metrics +# POST /rest/v1/otlp/v1/logs +# Both application/x-protobuf (default OTLP encoding) and application/json are accepted. +# The receiver is only active while enable_rest_service=true. +# +# Each OTLP request's resource attribute `service.name` is translated into a database name +# (e.g. "claude-code" -> claude_code, "codex" -> codex, "Gemini CLI" -> gemini_cli; missing +# service.name falls back to `unknown_service`). Database and tables are created on first +# use of each distinct service. + +# IoTDB user the OTLP receiver logs in as when writing telemetry data. +# effectiveMode: restart +# Datatype: String +otlp_username=root + +# Password paired with otlp_username. Leave the default if you are running an unchanged +# root/root installation; override in production. +# effectiveMode: restart +# Datatype: String +otlp_password=root + #################### ### Load balancing configuration ####################