attributes) {
+ if (attributes == null || attributes.isEmpty()) {
+ return "{}";
+ }
+ final StringBuilder sb = new StringBuilder(64);
+ sb.append('{');
+ boolean first = true;
+ for (final KeyValue kv : attributes) {
+ if (!first) {
+ sb.append(',');
+ }
+ first = false;
+ appendJsonString(sb, kv.getKey());
+ sb.append(':');
+ appendAnyValue(sb, kv.getValue());
+ }
+ sb.append('}');
+ return sb.toString();
+ }
+
+ /**
+ * Fallback database name used when an OTLP request carries no {@code service.name} resource
+ * attribute. Matches the OpenTelemetry convention for unnamed services.
+ */
+ static final String UNKNOWN_SERVICE_DATABASE = "unknown_service";
+
+ /**
+ * Derives a valid IoTDB database identifier from an OTLP {@code service.name}. Lower-cases the
+ * string and rewrites any character that is not a letter, digit, or underscore into an
+ * underscore; prefixes an underscore when the first character would be a digit (IoTDB identifiers
+ * must start with a letter or underscore). Empty / null service names fall back to {@link
+ * #UNKNOWN_SERVICE_DATABASE}.
+ *
+ * Examples: {@code "claude-code" -> "claude_code"}, {@code "Gemini CLI" -> "gemini_cli"},
+ * {@code "codex" -> "codex"}, {@code "" -> "unknown_service"}.
+ */
+ static String deriveDatabaseName(final String serviceName) {
+ if (serviceName == null || serviceName.isEmpty()) {
+ return UNKNOWN_SERVICE_DATABASE;
+ }
+ final StringBuilder sb = new StringBuilder(serviceName.length());
+ for (int i = 0; i < serviceName.length(); i++) {
+ final char c = serviceName.charAt(i);
+ if ((c >= 'a' && c <= 'z') || (c >= '0' && c <= '9') || c == '_') {
+ sb.append(c);
+ } else if (c >= 'A' && c <= 'Z') {
+ sb.append((char) (c + 32));
+ } else {
+ sb.append('_');
+ }
+ }
+ if (sb.length() == 0) {
+ return UNKNOWN_SERVICE_DATABASE;
+ }
+ if (sb.charAt(0) >= '0' && sb.charAt(0) <= '9') {
+ sb.insert(0, '_');
+ }
+ return sb.toString();
+ }
+
+ /** Looks up {@code service.name} from a resource attribute list. Returns "" if absent. */
+ static String extractServiceName(final List resourceAttrs) {
+ if (resourceAttrs == null) {
+ return "";
+ }
+ for (final KeyValue kv : resourceAttrs) {
+ if ("service.name".equals(kv.getKey())) {
+ final AnyValue v = kv.getValue();
+ if (v != null && v.hasStringValue()) {
+ return v.getStringValue();
+ }
+ }
+ }
+ return "";
+ }
+
+ /** Looks up an attribute by key from a flat attribute list, returning "" if missing. */
+ static String extractAttribute(final List attrs, final String key) {
+ if (attrs == null) {
+ return "";
+ }
+ for (final KeyValue kv : attrs) {
+ if (key.equals(kv.getKey())) {
+ return anyValueToString(kv.getValue());
+ }
+ }
+ return "";
+ }
+
+ private static void appendAnyValue(final StringBuilder sb, final AnyValue value) {
+ if (value == null) {
+ sb.append("null");
+ return;
+ }
+ switch (value.getValueCase()) {
+ case STRING_VALUE:
+ appendJsonString(sb, value.getStringValue());
+ break;
+ case BOOL_VALUE:
+ sb.append(value.getBoolValue());
+ break;
+ case INT_VALUE:
+ sb.append(value.getIntValue());
+ break;
+ case DOUBLE_VALUE:
+ {
+ final double d = value.getDoubleValue();
+ if (Double.isFinite(d)) {
+ sb.append(d);
+ } else {
+ appendJsonString(sb, Double.toString(d));
+ }
+ break;
+ }
+ case ARRAY_VALUE:
+ {
+ sb.append('[');
+ boolean first = true;
+ for (final AnyValue item : value.getArrayValue().getValuesList()) {
+ if (!first) {
+ sb.append(',');
+ }
+ first = false;
+ appendAnyValue(sb, item);
+ }
+ sb.append(']');
+ break;
+ }
+ case KVLIST_VALUE:
+ {
+ sb.append('{');
+ boolean first = true;
+ for (final KeyValue kv : value.getKvlistValue().getValuesList()) {
+ if (!first) {
+ sb.append(',');
+ }
+ first = false;
+ appendJsonString(sb, kv.getKey());
+ sb.append(':');
+ appendAnyValue(sb, kv.getValue());
+ }
+ sb.append('}');
+ break;
+ }
+ case BYTES_VALUE:
+ appendJsonString(sb, bytesToHex(value.getBytesValue()));
+ break;
+ case VALUE_NOT_SET:
+ default:
+ sb.append("null");
+ break;
+ }
+ }
+
+ private static String anyValueToString(final AnyValue value) {
+ if (value == null) {
+ return "";
+ }
+ switch (value.getValueCase()) {
+ case STRING_VALUE:
+ return value.getStringValue();
+ case BOOL_VALUE:
+ return Boolean.toString(value.getBoolValue());
+ case INT_VALUE:
+ return Long.toString(value.getIntValue());
+ case DOUBLE_VALUE:
+ return Double.toString(value.getDoubleValue());
+ case BYTES_VALUE:
+ return bytesToHex(value.getBytesValue());
+ default:
+ // Fall back to full JSON encoding for nested structures so callers still get a usable
+ // string instead of the empty placeholder.
+ final StringBuilder sb = new StringBuilder();
+ appendAnyValue(sb, value);
+ return sb.toString();
+ }
+ }
+
+ private static void appendJsonString(final StringBuilder sb, final String s) {
+ sb.append('"');
+ for (int i = 0; i < s.length(); i++) {
+ final char c = s.charAt(i);
+ switch (c) {
+ case '"':
+ sb.append("\\\"");
+ break;
+ case '\\':
+ sb.append("\\\\");
+ break;
+ case '\n':
+ sb.append("\\n");
+ break;
+ case '\r':
+ sb.append("\\r");
+ break;
+ case '\t':
+ sb.append("\\t");
+ break;
+ case '\b':
+ sb.append("\\b");
+ break;
+ case '\f':
+ sb.append("\\f");
+ break;
+ default:
+ if (c < 0x20) {
+ sb.append(String.format("\\u%04x", (int) c));
+ } else {
+ sb.append(c);
+ }
+ }
+ }
+ sb.append('"');
+ }
+}
diff --git a/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpHttp.java b/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpHttp.java
new file mode 100644
index 0000000000000..4169ab37b8a79
--- /dev/null
+++ b/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpHttp.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rest.protocol.otlp.v1;
+
+import com.google.protobuf.Message;
+import com.google.protobuf.util.JsonFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Locale;
+
+/**
+ * Shared HTTP plumbing for the three OTLP JAX-RS resources: parses the request body into the
+ * appropriate protobuf message (protobuf or JSON), and renders the empty success response in the
+ * encoding the client expects.
+ *
+ * OTLP/HTTP spec: content types are {@code application/x-protobuf} (default, required) and
+ * {@code application/json} (optional). Other content types are rejected with 415.
+ */
+final class OtlpHttp {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(OtlpHttp.class);
+
+ static final String PROTOBUF = "application/x-protobuf";
+ static final String JSON = "application/json";
+
+ private static final JsonFormat.Parser JSON_PARSER = JsonFormat.parser().ignoringUnknownFields();
+
+ private OtlpHttp() {}
+
+ /** True if the request body should be parsed as protobuf; false for JSON. */
+ static boolean isProtobuf(final HttpHeaders headers) {
+ final String raw = headers.getHeaderString(HttpHeaders.CONTENT_TYPE);
+ if (raw == null || raw.isEmpty()) {
+ // Default per OTLP spec.
+ return true;
+ }
+ final String contentType = raw.toLowerCase(Locale.ROOT);
+ if (contentType.startsWith(PROTOBUF)) {
+ return true;
+ }
+ if (contentType.startsWith(JSON)) {
+ return false;
+ }
+ // Fall back to protobuf for unrecognized types; the parser will fail cleanly if wrong.
+ return true;
+ }
+
+ /** Parses the request body into {@code builder}. Caller provides a fresh builder. */
+ static T parse(
+ final byte[] body, final T builder, final boolean protobuf) throws Exception {
+ if (body == null || body.length == 0) {
+ return builder;
+ }
+ if (protobuf) {
+ builder.mergeFrom(body);
+ } else {
+ JSON_PARSER.merge(new String(body, StandardCharsets.UTF_8), builder);
+ }
+ return builder;
+ }
+
+ /** Builds an OTLP success response (empty message) in the client's preferred encoding. */
+ static Response success(final Message emptyResponse, final boolean protobuf) {
+ if (protobuf) {
+ return Response.ok(emptyResponse.toByteArray(), PROTOBUF).build();
+ }
+ // JSON for the empty response is "{}".
+ return Response.ok("{}", JSON).build();
+ }
+
+ /** Builds an OTLP partial-success response with a message describing what failed. */
+ static Response partialFailure(final boolean protobuf, final String message) {
+ LOGGER.warn("OTLP request partially failed: {}", message);
+ if (protobuf) {
+ // Empty protobuf; the partial_success field would go here but keeping it minimal is fine.
+ return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
+ .type(PROTOBUF)
+ .entity(new byte[0])
+ .build();
+ }
+ return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
+ .type(JSON)
+ .entity("{\"message\":\"" + escapeJson(message) + "\"}")
+ .build();
+ }
+
+ /** Builds a client error response, e.g. on malformed input. */
+ static Response badRequest(final boolean protobuf, final String message) {
+ LOGGER.debug("OTLP bad request: {}", message);
+ if (protobuf) {
+ return Response.status(Response.Status.BAD_REQUEST)
+ .type(PROTOBUF)
+ .entity(new byte[0])
+ .build();
+ }
+ return Response.status(Response.Status.BAD_REQUEST)
+ .type(JSON)
+ .entity("{\"message\":\"" + escapeJson(message) + "\"}")
+ .build();
+ }
+
+ /** Returns {@link MediaType#APPLICATION_OCTET_STREAM} compatible default if nothing matches. */
+ static String responseContentType(final boolean protobuf) {
+ return protobuf ? PROTOBUF : JSON;
+ }
+
+ private static String escapeJson(final String s) {
+ if (s == null) {
+ return "";
+ }
+ return s.replace("\\", "\\\\").replace("\"", "\\\"");
+ }
+}
diff --git a/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpIngestor.java b/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpIngestor.java
new file mode 100644
index 0000000000000..46d2b952d423a
--- /dev/null
+++ b/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpIngestor.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rest.protocol.otlp.v1;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.protocol.session.IClientSession;
+import org.apache.iotdb.db.protocol.session.SessionManager;
+import org.apache.iotdb.db.queryengine.plan.Coordinator;
+import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BitMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Packs a column-oriented batch of OTLP rows into an {@link InsertTabletStatement} and hands it off
+ * to the {@link Coordinator} for the table model. The receiver builds rows via {@link
+ * OtlpTableBatch}; this class only cares about serializing those rows in the layout IoTDB expects.
+ */
+final class OtlpIngestor {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(OtlpIngestor.class);
+ private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
+ private static final Binary EMPTY_BINARY = new Binary("".getBytes(StandardCharsets.UTF_8));
+
+ private OtlpIngestor() {}
+
+ /** Inserts a batch. No-ops if batch is empty. Returns true on success. */
+ static boolean insert(
+ final String database, final IClientSession session, final OtlpTableBatch batch) {
+ if (batch.rowCount() == 0) {
+ return true;
+ }
+
+ final SessionManager sessionManager = SessionManager.getInstance();
+ final long queryId = sessionManager.requestQueryId();
+ try {
+ session.setDatabaseName(database);
+ session.setSqlDialect(IClientSession.SqlDialect.TABLE);
+
+ final InsertTabletStatement statement = buildStatement(batch);
+ final Metadata metadata = LocalExecutionPlanner.getInstance().metadata;
+
+ // The legacy 8-arg overload takes the tree-model Statement subtype (InsertTabletStatement).
+ // The newer overloads require a relational.sql.ast.Statement, which does not apply here.
+ // This is the same overload the REST /insertTablet and MQTT handlers use.
+ @SuppressWarnings("deprecation")
+ final ExecutionResult result =
+ Coordinator.getInstance()
+ .executeForTableModel(
+ statement,
+ new SqlParser(),
+ session,
+ queryId,
+ sessionManager.getSessionInfo(session),
+ "",
+ metadata,
+ CONFIG.getQueryTimeoutThreshold());
+ final TSStatus status = result.status;
+ final int code = status.getCode();
+ if (code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ && code != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+ LOGGER.warn(
+ "OTLP insert into {}.{} failed: code={}, message={}, rows={}",
+ database,
+ batch.tableName(),
+ code,
+ status.getMessage(),
+ batch.rowCount());
+ return false;
+ }
+ return true;
+ } catch (final Exception e) {
+ LOGGER.warn("OTLP insert into {}.{} threw", database, batch.tableName(), e);
+ return false;
+ } finally {
+ Coordinator.getInstance().cleanupQueryExecution(queryId);
+ }
+ }
+
+ private static InsertTabletStatement buildStatement(final OtlpTableBatch batch) {
+ final InsertTabletStatement statement = new InsertTabletStatement();
+ statement.setDevicePath(new PartialPath(batch.tableName(), false));
+ statement.setMeasurements(batch.columnNames());
+ statement.setTimes(batch.times());
+ statement.setDataTypes(batch.dataTypes());
+ statement.setColumnCategories(batch.columnCategories());
+ statement.setColumns(batch.columnValues());
+ statement.setBitMaps(batch.bitMaps());
+ statement.setRowCount(batch.rowCount());
+ statement.setAligned(false);
+ statement.setWriteToTable(true);
+ return statement;
+ }
+
+ /**
+ * Column-major row buffer with a fixed schema. Callers push rows with {@link #startRow(long)}
+ * followed by {@code set*} calls, then hand the batch to {@link OtlpIngestor#insert}. Unset cells
+ * become IoTDB nulls by way of the BitMap: the default IoTDB convention is that a fresh BitMap
+ * has every position cleared (= not-null), so we pre-mark every slot and let the per-column
+ * {@code set*} methods do nothing on null input — this way any slot that ends up untouched stays
+ * marked.
+ */
+ static final class OtlpTableBatch {
+ private final String tableName;
+ private final String[] columnNames;
+ private final TSDataType[] dataTypes;
+ private final TsTableColumnCategory[] columnCategories;
+ private final int capacity;
+
+ private final long[] times;
+ private final Object[] columnValues;
+ private final BitMap[] bitMaps;
+ private int rowCount;
+
+ OtlpTableBatch(
+ final String tableName,
+ final String[] columnNames,
+ final TSDataType[] dataTypes,
+ final TsTableColumnCategory[] columnCategories,
+ final int capacity) {
+ this.tableName = tableName;
+ this.columnNames = columnNames;
+ this.dataTypes = dataTypes;
+ this.columnCategories = columnCategories;
+ this.capacity = Math.max(capacity, 1);
+ this.times = new long[this.capacity];
+ this.columnValues = new Object[columnNames.length];
+ this.bitMaps = new BitMap[columnNames.length];
+ for (int c = 0; c < columnNames.length; c++) {
+ this.columnValues[c] = allocateColumn(dataTypes[c], this.capacity);
+ this.bitMaps[c] = null;
+ }
+ }
+
+ /** Begins a new row at {@code time}. Must be followed by set* calls for every non-null cell. */
+ void startRow(final long time) {
+ if (rowCount >= capacity) {
+ throw new IllegalStateException(
+ "OtlpTableBatch overflow: row " + rowCount + " >= capacity " + capacity);
+ }
+ times[rowCount] = time;
+ rowCount++;
+ }
+
+ void setString(final int column, final String value) {
+ if (value == null) {
+ return;
+ }
+ final Binary[] arr = (Binary[]) columnValues[column];
+ arr[rowCount - 1] =
+ value.isEmpty() ? EMPTY_BINARY : new Binary(value.getBytes(StandardCharsets.UTF_8));
+ }
+
+ void setLong(final int column, final long value) {
+ final long[] arr = (long[]) columnValues[column];
+ arr[rowCount - 1] = value;
+ }
+
+ void setInt(final int column, final int value) {
+ final int[] arr = (int[]) columnValues[column];
+ arr[rowCount - 1] = value;
+ }
+
+ void setDouble(final int column, final double value) {
+ final double[] arr = (double[]) columnValues[column];
+ arr[rowCount - 1] = value;
+ }
+
+ String tableName() {
+ return tableName;
+ }
+
+ String[] columnNames() {
+ return columnNames;
+ }
+
+ TSDataType[] dataTypes() {
+ return dataTypes;
+ }
+
+ TsTableColumnCategory[] columnCategories() {
+ return columnCategories;
+ }
+
+ int rowCount() {
+ return rowCount;
+ }
+
+ long[] times() {
+ if (rowCount == capacity) {
+ return times;
+ }
+ final long[] trimmed = new long[rowCount];
+ System.arraycopy(times, 0, trimmed, 0, rowCount);
+ return trimmed;
+ }
+
+ Object[] columnValues() {
+ // For string columns, replace any unassigned (null) slot with EMPTY_BINARY so the writer
+ // has a real Binary object to work with; the bit stays cleared so the cell still reads as
+ // not-null. If you need true nullability, mark the bit explicitly before calling this.
+ for (int c = 0; c < columnNames.length; c++) {
+ if (columnValues[c] instanceof Binary[]) {
+ final Binary[] arr = (Binary[]) columnValues[c];
+ for (int r = 0; r < rowCount; r++) {
+ if (arr[r] == null) {
+ arr[r] = EMPTY_BINARY;
+ }
+ }
+ }
+ }
+ if (rowCount == capacity) {
+ return columnValues;
+ }
+ final Object[] trimmed = new Object[columnNames.length];
+ for (int c = 0; c < columnNames.length; c++) {
+ trimmed[c] = trimColumn(columnValues[c], dataTypes[c], rowCount);
+ }
+ return trimmed;
+ }
+
+ BitMap[] bitMaps() {
+ // Fresh BitMaps: every bit zero = every cell not-null. OTLP rows always provide scalar
+ // defaults (empty strings, 0 ints) rather than true nulls, so we never mark anything.
+ final BitMap[] out = new BitMap[columnNames.length];
+ for (int c = 0; c < columnNames.length; c++) {
+ out[c] = new BitMap(rowCount);
+ }
+ return out;
+ }
+
+ private static Object allocateColumn(final TSDataType type, final int capacity) {
+ switch (type) {
+ case INT32:
+ return new int[capacity];
+ case INT64:
+ case TIMESTAMP:
+ return new long[capacity];
+ case FLOAT:
+ return new float[capacity];
+ case DOUBLE:
+ return new double[capacity];
+ case BOOLEAN:
+ return new boolean[capacity];
+ case TEXT:
+ case STRING:
+ case BLOB:
+ return new Binary[capacity];
+ default:
+ throw new UnsupportedOperationException("OTLP: unsupported TSDataType " + type);
+ }
+ }
+
+ private static Object trimColumn(final Object source, final TSDataType type, final int length) {
+ switch (type) {
+ case INT32:
+ {
+ final int[] trimmed = new int[length];
+ System.arraycopy(source, 0, trimmed, 0, length);
+ return trimmed;
+ }
+ case INT64:
+ case TIMESTAMP:
+ {
+ final long[] trimmed = new long[length];
+ System.arraycopy(source, 0, trimmed, 0, length);
+ return trimmed;
+ }
+ case FLOAT:
+ {
+ final float[] trimmed = new float[length];
+ System.arraycopy(source, 0, trimmed, 0, length);
+ return trimmed;
+ }
+ case DOUBLE:
+ {
+ final double[] trimmed = new double[length];
+ System.arraycopy(source, 0, trimmed, 0, length);
+ return trimmed;
+ }
+ case BOOLEAN:
+ {
+ final boolean[] trimmed = new boolean[length];
+ System.arraycopy(source, 0, trimmed, 0, length);
+ return trimmed;
+ }
+ case TEXT:
+ case STRING:
+ case BLOB:
+ {
+ final Binary[] trimmed = new Binary[length];
+ System.arraycopy(source, 0, trimmed, 0, length);
+ return trimmed;
+ }
+ default:
+ throw new UnsupportedOperationException("OTLP: unsupported TSDataType " + type);
+ }
+ }
+ }
+}
diff --git a/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpLogsConverter.java b/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpLogsConverter.java
new file mode 100644
index 0000000000000..ff507146a6c06
--- /dev/null
+++ b/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpLogsConverter.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rest.protocol.otlp.v1;
+
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+import org.apache.iotdb.db.protocol.session.IClientSession;
+import org.apache.iotdb.rest.protocol.otlp.v1.OtlpIngestor.OtlpTableBatch;
+
+import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest;
+import io.opentelemetry.proto.common.v1.AnyValue;
+import io.opentelemetry.proto.common.v1.KeyValue;
+import io.opentelemetry.proto.logs.v1.LogRecord;
+import io.opentelemetry.proto.logs.v1.ResourceLogs;
+import io.opentelemetry.proto.logs.v1.ScopeLogs;
+import org.apache.tsfile.enums.TSDataType;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+final class OtlpLogsConverter {
+
+ private static final String TABLE = "logs";
+
+ // TAG(3) + ATTRIBUTE(4) + FIELD(18) = 25 columns
+ private static final String[] COLUMN_NAMES = {
+ "user_id",
+ "session_id",
+ "event_name",
+ "terminal_type",
+ "service_version",
+ "os_type",
+ "host_arch",
+ "prompt_id",
+ "event_sequence",
+ "body",
+ "prompt_length",
+ "prompt",
+ "model",
+ "cost_usd",
+ "duration_ms",
+ "input_tokens",
+ "output_tokens",
+ "cache_read_tokens",
+ "cache_creation_tokens",
+ "request_id",
+ "speed",
+ "error",
+ "status_code",
+ "attempt",
+ "tool_name",
+ "success",
+ "tool_duration_ms",
+ "decision",
+ "decision_source",
+ "tool_result_size_bytes"
+ };
+ private static final TSDataType[] DATA_TYPES = {
+ TSDataType.STRING,
+ TSDataType.STRING,
+ TSDataType.STRING,
+ TSDataType.STRING,
+ TSDataType.STRING,
+ TSDataType.STRING,
+ TSDataType.STRING,
+ TSDataType.STRING,
+ TSDataType.INT32,
+ TSDataType.STRING,
+ TSDataType.INT32,
+ TSDataType.STRING,
+ TSDataType.STRING,
+ TSDataType.DOUBLE,
+ TSDataType.INT64,
+ TSDataType.INT64,
+ TSDataType.INT64,
+ TSDataType.INT64,
+ TSDataType.INT64,
+ TSDataType.STRING,
+ TSDataType.STRING,
+ TSDataType.STRING,
+ TSDataType.STRING,
+ TSDataType.INT32,
+ TSDataType.STRING,
+ TSDataType.STRING,
+ TSDataType.INT64,
+ TSDataType.STRING,
+ TSDataType.STRING,
+ TSDataType.INT64
+ };
+ private static final TsTableColumnCategory[] CATEGORIES = {
+ TsTableColumnCategory.TAG,
+ TsTableColumnCategory.TAG,
+ TsTableColumnCategory.TAG,
+ TsTableColumnCategory.ATTRIBUTE,
+ TsTableColumnCategory.ATTRIBUTE,
+ TsTableColumnCategory.ATTRIBUTE,
+ TsTableColumnCategory.ATTRIBUTE,
+ TsTableColumnCategory.FIELD,
+ TsTableColumnCategory.FIELD,
+ TsTableColumnCategory.FIELD,
+ TsTableColumnCategory.FIELD,
+ TsTableColumnCategory.FIELD,
+ TsTableColumnCategory.FIELD,
+ TsTableColumnCategory.FIELD,
+ TsTableColumnCategory.FIELD,
+ TsTableColumnCategory.FIELD,
+ TsTableColumnCategory.FIELD,
+ TsTableColumnCategory.FIELD,
+ TsTableColumnCategory.FIELD,
+ TsTableColumnCategory.FIELD,
+ TsTableColumnCategory.FIELD,
+ TsTableColumnCategory.FIELD,
+ TsTableColumnCategory.FIELD,
+ TsTableColumnCategory.FIELD,
+ TsTableColumnCategory.FIELD,
+ TsTableColumnCategory.FIELD,
+ TsTableColumnCategory.FIELD,
+ TsTableColumnCategory.FIELD,
+ TsTableColumnCategory.FIELD,
+ TsTableColumnCategory.FIELD
+ };
+
+ private static final int C_USER_ID = 0;
+ private static final int C_SESSION_ID = 1;
+ private static final int C_EVENT_NAME = 2;
+ private static final int C_TERMINAL_TYPE = 3;
+ private static final int C_SERVICE_VERSION = 4;
+ private static final int C_OS_TYPE = 5;
+ private static final int C_HOST_ARCH = 6;
+ private static final int C_PROMPT_ID = 7;
+ private static final int C_EVENT_SEQUENCE = 8;
+ private static final int C_BODY = 9;
+ private static final int C_PROMPT_LENGTH = 10;
+ private static final int C_PROMPT = 11;
+ private static final int C_MODEL = 12;
+ private static final int C_COST_USD = 13;
+ private static final int C_DURATION_MS = 14;
+ private static final int C_INPUT_TOKENS = 15;
+ private static final int C_OUTPUT_TOKENS = 16;
+ private static final int C_CACHE_READ_TOKENS = 17;
+ private static final int C_CACHE_CREATION_TOKENS = 18;
+ private static final int C_REQUEST_ID = 19;
+ private static final int C_SPEED = 20;
+ private static final int C_ERROR = 21;
+ private static final int C_STATUS_CODE = 22;
+ private static final int C_ATTEMPT = 23;
+ private static final int C_TOOL_NAME = 24;
+ private static final int C_SUCCESS = 25;
+ private static final int C_TOOL_DURATION_MS = 26;
+ private static final int C_DECISION = 27;
+ private static final int C_DECISION_SOURCE = 28;
+ private static final int C_TOOL_RESULT_SIZE_BYTES = 29;
+
+ private OtlpLogsConverter() {}
+
+ static boolean convertAndInsert(
+ final OtlpService service, final ExportLogsServiceRequest request) {
+ final Map> byDb = new HashMap<>();
+ for (final ResourceLogs rl : request.getResourceLogsList()) {
+ final String db =
+ OtlpConverter.deriveDatabaseName(
+ OtlpConverter.extractServiceName(rl.getResource().getAttributesList()));
+ byDb.computeIfAbsent(db, k -> new ArrayList<>()).add(rl);
+ }
+ boolean allOk = true;
+ for (final Map.Entry> entry : byDb.entrySet()) {
+ if (!insertForDatabase(service, entry.getKey(), entry.getValue())) {
+ allOk = false;
+ }
+ }
+ return allOk;
+ }
+
+ private static boolean insertForDatabase(
+ final OtlpService service, final String db, final List rls) {
+ int capacity = 0;
+ for (final ResourceLogs rl : rls) {
+ for (final ScopeLogs sl : rl.getScopeLogsList()) {
+ capacity += sl.getLogRecordsCount();
+ }
+ }
+ if (capacity == 0) {
+ return true;
+ }
+ final IClientSession session = service.sessionFor(db);
+ final OtlpTableBatch batch =
+ new OtlpTableBatch(TABLE, COLUMN_NAMES, DATA_TYPES, CATEGORIES, capacity);
+
+ for (final ResourceLogs rl : rls) {
+ final List resAttrs = rl.getResource().getAttributesList();
+ final String serviceVersion = OtlpConverter.extractAttribute(resAttrs, "service.version");
+ final String osType = OtlpConverter.extractAttribute(resAttrs, "os.type");
+ final String hostArch = OtlpConverter.extractAttribute(resAttrs, "host.arch");
+ for (final ScopeLogs sl : rl.getScopeLogsList()) {
+ for (final LogRecord log : sl.getLogRecordsList()) {
+ final long ts =
+ log.getTimeUnixNano() != 0 ? log.getTimeUnixNano() : log.getObservedTimeUnixNano();
+ batch.startRow(OtlpConverter.nanoToDbPrecision(ts));
+
+ final List attrs = log.getAttributesList();
+ final String eventName = OtlpConverter.extractAttribute(attrs, "event.name");
+
+ // TAGs
+ batch.setString(C_USER_ID, OtlpConverter.extractAttribute(attrs, "user.id"));
+ batch.setString(C_SESSION_ID, OtlpConverter.extractAttribute(attrs, "session.id"));
+ batch.setString(C_EVENT_NAME, eventName);
+ // ATTRIBUTEs
+ batch.setString(C_TERMINAL_TYPE, OtlpConverter.extractAttribute(attrs, "terminal.type"));
+ batch.setString(C_SERVICE_VERSION, serviceVersion);
+ batch.setString(C_OS_TYPE, osType);
+ batch.setString(C_HOST_ARCH, hostArch);
+ // Common FIELDs
+ batch.setString(C_PROMPT_ID, OtlpConverter.extractAttribute(attrs, "prompt.id"));
+ setIntFromAttr(batch, C_EVENT_SEQUENCE, attrs, "event.sequence");
+ batch.setString(C_BODY, bodyToString(log.getBody()));
+
+ // Event-specific FIELDs
+ if ("user_prompt".equals(eventName)) {
+ setIntFromAttr(batch, C_PROMPT_LENGTH, attrs, "prompt_length");
+ batch.setString(C_PROMPT, OtlpConverter.extractAttribute(attrs, "prompt"));
+ } else if ("api_request".equals(eventName)) {
+ batch.setString(C_MODEL, OtlpConverter.extractAttribute(attrs, "model"));
+ setDoubleFromAttr(batch, C_COST_USD, attrs, "cost_usd");
+ setLongFromAttr(batch, C_DURATION_MS, attrs, "duration_ms");
+ setLongFromAttr(batch, C_INPUT_TOKENS, attrs, "input_tokens");
+ setLongFromAttr(batch, C_OUTPUT_TOKENS, attrs, "output_tokens");
+ setLongFromAttr(batch, C_CACHE_READ_TOKENS, attrs, "cache_read_tokens");
+ setLongFromAttr(batch, C_CACHE_CREATION_TOKENS, attrs, "cache_creation_tokens");
+ batch.setString(C_REQUEST_ID, OtlpConverter.extractAttribute(attrs, "request_id"));
+ batch.setString(C_SPEED, OtlpConverter.extractAttribute(attrs, "speed"));
+ } else if ("api_error".equals(eventName)) {
+ batch.setString(C_MODEL, OtlpConverter.extractAttribute(attrs, "model"));
+ batch.setString(C_ERROR, OtlpConverter.extractAttribute(attrs, "error"));
+ batch.setString(C_STATUS_CODE, OtlpConverter.extractAttribute(attrs, "status_code"));
+ setLongFromAttr(batch, C_DURATION_MS, attrs, "duration_ms");
+ setIntFromAttr(batch, C_ATTEMPT, attrs, "attempt");
+ batch.setString(C_REQUEST_ID, OtlpConverter.extractAttribute(attrs, "request_id"));
+ batch.setString(C_SPEED, OtlpConverter.extractAttribute(attrs, "speed"));
+ } else if ("tool_result".equals(eventName)) {
+ batch.setString(C_TOOL_NAME, OtlpConverter.extractAttribute(attrs, "tool_name"));
+ batch.setString(C_SUCCESS, OtlpConverter.extractAttribute(attrs, "success"));
+ setLongFromAttr(batch, C_TOOL_DURATION_MS, attrs, "duration_ms");
+ batch.setString(
+ C_DECISION_SOURCE, OtlpConverter.extractAttribute(attrs, "decision_source"));
+ batch.setString(C_DECISION, OtlpConverter.extractAttribute(attrs, "decision_type"));
+ setLongFromAttr(batch, C_TOOL_RESULT_SIZE_BYTES, attrs, "tool_result_size_bytes");
+ } else if ("tool_decision".equals(eventName)) {
+ batch.setString(C_TOOL_NAME, OtlpConverter.extractAttribute(attrs, "tool_name"));
+ batch.setString(C_DECISION, OtlpConverter.extractAttribute(attrs, "decision"));
+ batch.setString(C_DECISION_SOURCE, OtlpConverter.extractAttribute(attrs, "source"));
+ }
+ }
+ }
+ }
+ return OtlpIngestor.insert(db, session, batch);
+ }
+
+ private static void setLongFromAttr(
+ final OtlpTableBatch batch, final int col, final List attrs, final String key) {
+ final String v = OtlpConverter.extractAttribute(attrs, key);
+ if (!v.isEmpty()) {
+ try {
+ batch.setLong(col, Long.parseLong(v));
+ } catch (final NumberFormatException ignored) {
+ }
+ }
+ }
+
+ private static void setIntFromAttr(
+ final OtlpTableBatch batch, final int col, final List attrs, final String key) {
+ final String v = OtlpConverter.extractAttribute(attrs, key);
+ if (!v.isEmpty()) {
+ try {
+ batch.setInt(col, Integer.parseInt(v));
+ } catch (final NumberFormatException ignored) {
+ }
+ }
+ }
+
+ private static void setDoubleFromAttr(
+ final OtlpTableBatch batch, final int col, final List attrs, final String key) {
+ final String v = OtlpConverter.extractAttribute(attrs, key);
+ if (!v.isEmpty()) {
+ try {
+ batch.setDouble(col, Double.parseDouble(v));
+ } catch (final NumberFormatException ignored) {
+ }
+ }
+ }
+
+ private static String bodyToString(final AnyValue body) {
+ if (body == null) {
+ return "";
+ }
+ switch (body.getValueCase()) {
+ case STRING_VALUE:
+ return body.getStringValue();
+ case BOOL_VALUE:
+ return Boolean.toString(body.getBoolValue());
+ case INT_VALUE:
+ return Long.toString(body.getIntValue());
+ case DOUBLE_VALUE:
+ return Double.toString(body.getDoubleValue());
+ case BYTES_VALUE:
+ return OtlpConverter.bytesToHex(body.getBytesValue());
+ default:
+ return body.toString();
+ }
+ }
+}
diff --git a/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpLogsResource.java b/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpLogsResource.java
new file mode 100644
index 0000000000000..d3d81d0188280
--- /dev/null
+++ b/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpLogsResource.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rest.protocol.otlp.v1;
+
+import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest;
+import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.Response;
+
+/** OTLP/HTTP logs endpoint, served at {@code /rest/otlp/v1/logs}. */
+@Path("/rest/v1/otlp/v1/logs")
+public class OtlpLogsResource {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(OtlpLogsResource.class);
+
+ @POST
+ public Response export(@Context final HttpHeaders headers, final byte[] body) {
+ final boolean protobuf = OtlpHttp.isProtobuf(headers);
+ try {
+ final ExportLogsServiceRequest request =
+ OtlpHttp.parse(body, ExportLogsServiceRequest.newBuilder(), protobuf).build();
+ final boolean ok = OtlpService.getInstance().ingestLogs(request);
+ if (!ok) {
+ return OtlpHttp.partialFailure(protobuf, "OTLP logs insert failed");
+ }
+ return OtlpHttp.success(ExportLogsServiceResponse.getDefaultInstance(), protobuf);
+ } catch (final Exception e) {
+ LOGGER.warn("OTLP logs export failed", e);
+ return OtlpHttp.badRequest(protobuf, e.getMessage());
+ }
+ }
+}
diff --git a/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpMetricsConverter.java b/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpMetricsConverter.java
new file mode 100644
index 0000000000000..0e27b19070ad0
--- /dev/null
+++ b/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpMetricsConverter.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rest.protocol.otlp.v1;
+
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+import org.apache.iotdb.db.protocol.session.IClientSession;
+import org.apache.iotdb.rest.protocol.otlp.v1.OtlpIngestor.OtlpTableBatch;
+
+import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
+import io.opentelemetry.proto.common.v1.KeyValue;
+import io.opentelemetry.proto.metrics.v1.HistogramDataPoint;
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
+import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
+import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
+import io.opentelemetry.proto.metrics.v1.SummaryDataPoint;
+import org.apache.tsfile.enums.TSDataType;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+final class OtlpMetricsConverter {
+
+ private static final String TABLE = "metrics";
+
+ // TAG(5) + ATTRIBUTE(8) + FIELD(1) = 14 columns
+ private static final String[] COLUMN_NAMES = {
+ "user_id",
+ "session_id",
+ "metric_name",
+ "model",
+ "type",
+ "terminal_type",
+ "service_version",
+ "os_type",
+ "os_version",
+ "host_arch",
+ "unit",
+ "metric_type",
+ "description",
+ "value"
+ };
+ private static final TSDataType[] DATA_TYPES = {
+ TSDataType.STRING,
+ TSDataType.STRING,
+ TSDataType.STRING,
+ TSDataType.STRING,
+ TSDataType.STRING,
+ TSDataType.STRING,
+ TSDataType.STRING,
+ TSDataType.STRING,
+ TSDataType.STRING,
+ TSDataType.STRING,
+ TSDataType.STRING,
+ TSDataType.STRING,
+ TSDataType.STRING,
+ TSDataType.DOUBLE
+ };
+ private static final TsTableColumnCategory[] CATEGORIES = {
+ TsTableColumnCategory.TAG,
+ TsTableColumnCategory.TAG,
+ TsTableColumnCategory.TAG,
+ TsTableColumnCategory.TAG,
+ TsTableColumnCategory.TAG,
+ TsTableColumnCategory.ATTRIBUTE,
+ TsTableColumnCategory.ATTRIBUTE,
+ TsTableColumnCategory.ATTRIBUTE,
+ TsTableColumnCategory.ATTRIBUTE,
+ TsTableColumnCategory.ATTRIBUTE,
+ TsTableColumnCategory.ATTRIBUTE,
+ TsTableColumnCategory.ATTRIBUTE,
+ TsTableColumnCategory.ATTRIBUTE,
+ TsTableColumnCategory.FIELD
+ };
+
+ // Column indices
+ private static final int C_USER_ID = 0;
+ private static final int C_SESSION_ID = 1;
+ private static final int C_METRIC_NAME = 2;
+ private static final int C_MODEL = 3;
+ private static final int C_TYPE = 4;
+ private static final int C_TERMINAL_TYPE = 5;
+ private static final int C_SERVICE_VERSION = 6;
+ private static final int C_OS_TYPE = 7;
+ private static final int C_OS_VERSION = 8;
+ private static final int C_HOST_ARCH = 9;
+ private static final int C_UNIT = 10;
+ private static final int C_METRIC_TYPE = 11;
+ private static final int C_DESCRIPTION = 12;
+ private static final int C_VALUE = 13;
+
+ private OtlpMetricsConverter() {}
+
+ static boolean convertAndInsert(
+ final OtlpService service, final ExportMetricsServiceRequest request) {
+ final Map> byDb = new HashMap<>();
+ for (final ResourceMetrics rm : request.getResourceMetricsList()) {
+ final String db =
+ OtlpConverter.deriveDatabaseName(
+ OtlpConverter.extractServiceName(rm.getResource().getAttributesList()));
+ byDb.computeIfAbsent(db, k -> new ArrayList<>()).add(rm);
+ }
+ boolean allOk = true;
+ for (final Map.Entry> entry : byDb.entrySet()) {
+ if (!insertForDatabase(service, entry.getKey(), entry.getValue())) {
+ allOk = false;
+ }
+ }
+ return allOk;
+ }
+
+ private static boolean insertForDatabase(
+ final OtlpService service, final String db, final List rms) {
+ int capacity = 0;
+ for (final ResourceMetrics rm : rms) {
+ for (final ScopeMetrics sm : rm.getScopeMetricsList()) {
+ for (final Metric m : sm.getMetricsList()) {
+ capacity += countDataPoints(m);
+ }
+ }
+ }
+ if (capacity == 0) {
+ return true;
+ }
+ final IClientSession session = service.sessionFor(db);
+ final OtlpTableBatch batch =
+ new OtlpTableBatch(TABLE, COLUMN_NAMES, DATA_TYPES, CATEGORIES, capacity);
+
+ for (final ResourceMetrics rm : rms) {
+ final List resAttrs = rm.getResource().getAttributesList();
+ final String serviceVersion = OtlpConverter.extractAttribute(resAttrs, "service.version");
+ final String osType = OtlpConverter.extractAttribute(resAttrs, "os.type");
+ final String osVersion = OtlpConverter.extractAttribute(resAttrs, "os.version");
+ final String hostArch = OtlpConverter.extractAttribute(resAttrs, "host.arch");
+ for (final ScopeMetrics sm : rm.getScopeMetricsList()) {
+ for (final Metric metric : sm.getMetricsList()) {
+ writeMetric(batch, metric, serviceVersion, osType, osVersion, hostArch);
+ }
+ }
+ }
+ return OtlpIngestor.insert(db, session, batch);
+ }
+
+ private static void writeMetric(
+ final OtlpTableBatch batch,
+ final Metric metric,
+ final String serviceVersion,
+ final String osType,
+ final String osVersion,
+ final String hostArch) {
+ switch (metric.getDataCase()) {
+ case GAUGE:
+ for (final NumberDataPoint dp : metric.getGauge().getDataPointsList()) {
+ writeNumberPoint(batch, metric, "gauge", dp, serviceVersion, osType, osVersion, hostArch);
+ }
+ break;
+ case SUM:
+ for (final NumberDataPoint dp : metric.getSum().getDataPointsList()) {
+ writeNumberPoint(batch, metric, "sum", dp, serviceVersion, osType, osVersion, hostArch);
+ }
+ break;
+ case HISTOGRAM:
+ for (final HistogramDataPoint dp : metric.getHistogram().getDataPointsList()) {
+ writeRow(
+ batch,
+ dp.getTimeUnixNano(),
+ dp.getAttributesList(),
+ metric.getName(),
+ "histogram",
+ dp.hasSum() ? dp.getSum() : 0.0,
+ metric.getUnit(),
+ metric.getDescription(),
+ serviceVersion,
+ osType,
+ osVersion,
+ hostArch);
+ }
+ break;
+ case EXPONENTIAL_HISTOGRAM:
+ for (final io.opentelemetry.proto.metrics.v1.ExponentialHistogramDataPoint dp :
+ metric.getExponentialHistogram().getDataPointsList()) {
+ writeRow(
+ batch,
+ dp.getTimeUnixNano(),
+ dp.getAttributesList(),
+ metric.getName(),
+ "exponential_histogram",
+ dp.hasSum() ? dp.getSum() : 0.0,
+ metric.getUnit(),
+ metric.getDescription(),
+ serviceVersion,
+ osType,
+ osVersion,
+ hostArch);
+ }
+ break;
+ case SUMMARY:
+ for (final SummaryDataPoint dp : metric.getSummary().getDataPointsList()) {
+ writeRow(
+ batch,
+ dp.getTimeUnixNano(),
+ dp.getAttributesList(),
+ metric.getName(),
+ "summary",
+ dp.getSum(),
+ metric.getUnit(),
+ metric.getDescription(),
+ serviceVersion,
+ osType,
+ osVersion,
+ hostArch);
+ }
+ break;
+ default:
+ break;
+ }
+ }
+
+ private static void writeNumberPoint(
+ final OtlpTableBatch batch,
+ final Metric metric,
+ final String metricType,
+ final NumberDataPoint dp,
+ final String serviceVersion,
+ final String osType,
+ final String osVersion,
+ final String hostArch) {
+ writeRow(
+ batch,
+ dp.getTimeUnixNano(),
+ dp.getAttributesList(),
+ metric.getName(),
+ metricType,
+ numericValue(dp),
+ metric.getUnit(),
+ metric.getDescription(),
+ serviceVersion,
+ osType,
+ osVersion,
+ hostArch);
+ }
+
+ private static void writeRow(
+ final OtlpTableBatch batch,
+ final long timeUnixNano,
+ final List dpAttrs,
+ final String metricName,
+ final String metricType,
+ final double value,
+ final String unit,
+ final String description,
+ final String serviceVersion,
+ final String osType,
+ final String osVersion,
+ final String hostArch) {
+ batch.startRow(OtlpConverter.nanoToDbPrecision(timeUnixNano));
+ // TAGs
+ batch.setString(C_USER_ID, OtlpConverter.extractAttribute(dpAttrs, "user.id"));
+ batch.setString(C_SESSION_ID, OtlpConverter.extractAttribute(dpAttrs, "session.id"));
+ batch.setString(C_METRIC_NAME, metricName);
+ batch.setString(C_MODEL, OtlpConverter.extractAttribute(dpAttrs, "model"));
+ batch.setString(C_TYPE, OtlpConverter.extractAttribute(dpAttrs, "type"));
+ // ATTRIBUTEs
+ batch.setString(C_TERMINAL_TYPE, OtlpConverter.extractAttribute(dpAttrs, "terminal.type"));
+ batch.setString(C_SERVICE_VERSION, serviceVersion);
+ batch.setString(C_OS_TYPE, osType);
+ batch.setString(C_OS_VERSION, osVersion);
+ batch.setString(C_HOST_ARCH, hostArch);
+ batch.setString(C_UNIT, unit);
+ batch.setString(C_METRIC_TYPE, metricType);
+ batch.setString(C_DESCRIPTION, description);
+ // FIELD
+ batch.setDouble(C_VALUE, value);
+ }
+
+ private static double numericValue(final NumberDataPoint dp) {
+ switch (dp.getValueCase()) {
+ case AS_DOUBLE:
+ return dp.getAsDouble();
+ case AS_INT:
+ return (double) dp.getAsInt();
+ default:
+ return 0.0;
+ }
+ }
+
+ private static int countDataPoints(final Metric metric) {
+ switch (metric.getDataCase()) {
+ case GAUGE:
+ return metric.getGauge().getDataPointsCount();
+ case SUM:
+ return metric.getSum().getDataPointsCount();
+ case HISTOGRAM:
+ return metric.getHistogram().getDataPointsCount();
+ case EXPONENTIAL_HISTOGRAM:
+ return metric.getExponentialHistogram().getDataPointsCount();
+ case SUMMARY:
+ return metric.getSummary().getDataPointsCount();
+ default:
+ return 0;
+ }
+ }
+}
diff --git a/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpMetricsResource.java b/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpMetricsResource.java
new file mode 100644
index 0000000000000..50945ac2cf5d7
--- /dev/null
+++ b/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpMetricsResource.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rest.protocol.otlp.v1;
+
+import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
+import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.Response;
+
+/** OTLP/HTTP metrics endpoint, served at {@code /rest/otlp/v1/metrics}. */
+@Path("/rest/v1/otlp/v1/metrics")
+public class OtlpMetricsResource {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(OtlpMetricsResource.class);
+
+ @POST
+ public Response export(@Context final HttpHeaders headers, final byte[] body) {
+ final boolean protobuf = OtlpHttp.isProtobuf(headers);
+ try {
+ final ExportMetricsServiceRequest request =
+ OtlpHttp.parse(body, ExportMetricsServiceRequest.newBuilder(), protobuf).build();
+ final boolean ok = OtlpService.getInstance().ingestMetrics(request);
+ if (!ok) {
+ return OtlpHttp.partialFailure(protobuf, "OTLP metrics insert failed");
+ }
+ return OtlpHttp.success(ExportMetricsServiceResponse.getDefaultInstance(), protobuf);
+ } catch (final Exception e) {
+ LOGGER.warn("OTLP metrics export failed", e);
+ return OtlpHttp.badRequest(protobuf, e.getMessage());
+ }
+ }
+}
diff --git a/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpSchemaInitializer.java b/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpSchemaInitializer.java
new file mode 100644
index 0000000000000..05b32d924b085
--- /dev/null
+++ b/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpSchemaInitializer.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rest.protocol.otlp.v1;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.protocol.session.IClientSession;
+import org.apache.iotdb.db.protocol.session.SessionManager;
+import org.apache.iotdb.db.queryengine.plan.Coordinator;
+import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Ensures the {@code claude_code} database and the {@code traces}, {@code metrics}, {@code logs}
+ * tables exist before the OTLP receiver writes its first batch. Idempotent via {@code CREATE ... IF
+ * NOT EXISTS}, so running it once per ingest call is cheap after the initial DDL has been
+ * replicated across the cluster.
+ */
+final class OtlpSchemaInitializer {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(OtlpSchemaInitializer.class);
+
+ private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
+
+ private OtlpSchemaInitializer() {}
+
+ static void initialize(final String database, final IClientSession session) {
+ runDdl(session, null, "CREATE DATABASE IF NOT EXISTS " + database);
+
+ runDdl(
+ session,
+ database,
+ "CREATE TABLE IF NOT EXISTS metrics ("
+ + "user_id STRING TAG, "
+ + "session_id STRING TAG, "
+ + "metric_name STRING TAG, "
+ + "model STRING TAG, "
+ + "type STRING TAG, "
+ + "terminal_type STRING ATTRIBUTE, "
+ + "service_version STRING ATTRIBUTE, "
+ + "os_type STRING ATTRIBUTE, "
+ + "os_version STRING ATTRIBUTE, "
+ + "host_arch STRING ATTRIBUTE, "
+ + "unit STRING ATTRIBUTE, "
+ + "metric_type STRING ATTRIBUTE, "
+ + "description STRING ATTRIBUTE, "
+ + "value DOUBLE FIELD)");
+
+ runDdl(
+ session,
+ database,
+ "CREATE TABLE IF NOT EXISTS logs ("
+ + "user_id STRING TAG, "
+ + "session_id STRING TAG, "
+ + "event_name STRING TAG, "
+ + "terminal_type STRING ATTRIBUTE, "
+ + "service_version STRING ATTRIBUTE, "
+ + "os_type STRING ATTRIBUTE, "
+ + "host_arch STRING ATTRIBUTE, "
+ + "prompt_id STRING FIELD, "
+ + "event_sequence INT32 FIELD, "
+ + "body STRING FIELD, "
+ + "prompt_length INT32 FIELD, "
+ + "prompt STRING FIELD, "
+ + "model STRING FIELD, "
+ + "cost_usd DOUBLE FIELD, "
+ + "duration_ms INT64 FIELD, "
+ + "input_tokens INT64 FIELD, "
+ + "output_tokens INT64 FIELD, "
+ + "cache_read_tokens INT64 FIELD, "
+ + "cache_creation_tokens INT64 FIELD, "
+ + "request_id STRING FIELD, "
+ + "speed STRING FIELD, "
+ + "error STRING FIELD, "
+ + "status_code STRING FIELD, "
+ + "attempt INT32 FIELD, "
+ + "tool_name STRING FIELD, "
+ + "success STRING FIELD, "
+ + "tool_duration_ms INT64 FIELD, "
+ + "decision STRING FIELD, "
+ + "decision_source STRING FIELD, "
+ + "tool_result_size_bytes INT64 FIELD)");
+
+ runDdl(
+ session,
+ database,
+ "CREATE TABLE IF NOT EXISTS traces ("
+ + "service_name STRING TAG, "
+ + "span_name STRING TAG, "
+ + "service_version STRING ATTRIBUTE, "
+ + "os_type STRING ATTRIBUTE, "
+ + "host_arch STRING ATTRIBUTE, "
+ + "trace_id STRING FIELD, "
+ + "span_id STRING FIELD, "
+ + "parent_span_id STRING FIELD, "
+ + "span_kind STRING FIELD, "
+ + "start_time_unix_nano INT64 FIELD, "
+ + "end_time_unix_nano INT64 FIELD, "
+ + "duration_nano INT64 FIELD, "
+ + "status_code STRING FIELD, "
+ + "status_message STRING FIELD, "
+ + "attributes STRING FIELD, "
+ + "resource_attributes STRING FIELD, "
+ + "scope_name STRING FIELD, "
+ + "scope_version STRING FIELD)");
+ }
+
+ private static void runDdl(
+ final IClientSession session, final String database, final String sql) {
+ final SessionManager sessionManager = SessionManager.getInstance();
+ final Long queryId = sessionManager.requestQueryId();
+ try {
+ if (database == null) {
+ // CREATE DATABASE must not set a current database; clear it so the parser parses the name
+ // as-is rather than prepending the current database.
+ session.setDatabaseName(null);
+ } else {
+ session.setDatabaseName(database);
+ }
+ session.setSqlDialect(IClientSession.SqlDialect.TABLE);
+
+ final SqlParser parser = new SqlParser();
+ final Statement statement = parser.createStatement(sql, session.getZoneId(), session);
+ final Metadata metadata = LocalExecutionPlanner.getInstance().metadata;
+
+ final ExecutionResult result =
+ Coordinator.getInstance()
+ .executeForTableModel(
+ statement,
+ parser,
+ session,
+ queryId,
+ sessionManager.getSessionInfo(session),
+ sql,
+ metadata,
+ CONFIG.getQueryTimeoutThreshold(),
+ false,
+ false);
+ final TSStatus status = result.status;
+ final int code = status.getCode();
+ if (code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ && code != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
+ // 602 = DATABASE_ALREADY_EXISTS, 507 = TABLE_ALREADY_EXISTS in IF NOT EXISTS races
+ && code != TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()
+ && code != TSStatusCode.TABLE_ALREADY_EXISTS.getStatusCode()) {
+ LOGGER.warn(
+ "OTLP schema init DDL failed: sql=[{}], code={}, message={}",
+ sql,
+ code,
+ status.getMessage());
+ }
+ } catch (final Exception e) {
+ LOGGER.warn("OTLP schema init DDL threw: sql=[{}]", sql, e);
+ } finally {
+ Coordinator.getInstance().cleanupQueryExecution(queryId);
+ }
+ }
+}
diff --git a/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpService.java b/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpService.java
new file mode 100644
index 0000000000000..bc40e5edc7611
--- /dev/null
+++ b/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpService.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rest.protocol.otlp.v1;
+
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.rest.IoTDBRestServiceConfig;
+import org.apache.iotdb.db.conf.rest.IoTDBRestServiceDescriptor;
+import org.apache.iotdb.db.protocol.basic.BasicOpenSessionResp;
+import org.apache.iotdb.db.protocol.session.IClientSession;
+import org.apache.iotdb.db.protocol.session.RestClientSession;
+import org.apache.iotdb.db.protocol.session.SessionManager;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
+
+import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest;
+import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
+import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.ZoneId;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Singleton glue between the HTTP OTLP resources and the IoTDB coordinator.
+ *
+ * There is no single "OTLP database": the receiver routes each OTLP resource group to a database
+ * derived from its {@code service.name} resource attribute (see {@link
+ * OtlpConverter#deriveDatabaseName}), so traffic from {@code claude-code}, {@code codex}, {@code
+ * gemini} etc. lands in separate databases automatically.
+ *
+ *
Each database gets its own pinned {@link IClientSession} cached in {@link #sessionByDatabase},
+ * created lazily on first use along with its {@code traces / metrics / logs} tables. Pinning the
+ * database per session avoids races between concurrent requests that would otherwise share one
+ * session's {@code databaseName} field.
+ */
+public final class OtlpService {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(OtlpService.class);
+
+ private static final OtlpService INSTANCE = new OtlpService();
+
+ private final IoTDBRestServiceConfig config =
+ IoTDBRestServiceDescriptor.getInstance().getConfig();
+
+ private final ConcurrentHashMap sessionByDatabase =
+ new ConcurrentHashMap<>();
+
+ private OtlpService() {}
+
+ public static OtlpService getInstance() {
+ return INSTANCE;
+ }
+
+ /**
+ * Returns a session pinned to {@code database}, creating it and its OTLP tables on first call.
+ * Safe to invoke from any request thread; the expensive login + DDL run at most once per database
+ * for the lifetime of the process.
+ */
+ IClientSession sessionFor(final String database) {
+ final IClientSession existing = sessionByDatabase.get(database);
+ if (existing != null) {
+ return existing;
+ }
+ return sessionByDatabase.computeIfAbsent(database, this::openDatabaseSession);
+ }
+
+ private IClientSession openDatabaseSession(final String database) {
+ final IClientSession session = login();
+ if (session == null) {
+ throw new IllegalStateException(
+ "OTLP receiver failed to log in as user '"
+ + config.getOtlpUsername()
+ + "'. Check otlp_username / otlp_password in iotdb-system.properties.");
+ }
+ OtlpSchemaInitializer.initialize(database, session);
+ // After schema init the session's databaseName is pinned to `database`; subsequent inserts
+ // on this session all target it.
+ session.setDatabaseName(database);
+ session.setSqlDialect(IClientSession.SqlDialect.TABLE);
+ LOGGER.info(
+ "OTLP receiver ready for database={} (user={})", database, config.getOtlpUsername());
+ return session;
+ }
+
+ private IClientSession login() {
+ final SessionManager sm = SessionManager.getInstance();
+ // Use 127.0.0.1 as the client address so LoginLockManager's localhost check succeeds without
+ // triggering a DNS lookup on a synthetic UUID (which logs a spurious UnknownHostException).
+ final RestClientSession session = new RestClientSession("127.0.0.1");
+ session.setUsername(config.getOtlpUsername());
+ final BasicOpenSessionResp resp =
+ sm.login(
+ session,
+ config.getOtlpUsername(),
+ config.getOtlpPassword(),
+ ZoneId.systemDefault().toString(),
+ TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3,
+ IoTDBConstant.ClientVersion.V_1_0,
+ IClientSession.SqlDialect.TABLE);
+ if (resp.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ LOGGER.warn(
+ "OTLP login failed: user={}, code={}, message={}",
+ config.getOtlpUsername(),
+ resp.getCode(),
+ resp.getMessage());
+ return null;
+ }
+ return session;
+ }
+
+ /** Ingests an OTLP trace export request. Returns true on all-success. */
+ public boolean ingestTraces(final ExportTraceServiceRequest request) {
+ return OtlpTracesConverter.convertAndInsert(this, request);
+ }
+
+ /** Ingests an OTLP metrics export request. Returns true on all-success. */
+ public boolean ingestMetrics(final ExportMetricsServiceRequest request) {
+ return OtlpMetricsConverter.convertAndInsert(this, request);
+ }
+
+ /** Ingests an OTLP logs export request. Returns true on all-success. */
+ public boolean ingestLogs(final ExportLogsServiceRequest request) {
+ return OtlpLogsConverter.convertAndInsert(this, request);
+ }
+}
diff --git a/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpTracesConverter.java b/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpTracesConverter.java
new file mode 100644
index 0000000000000..0160ec6daba87
--- /dev/null
+++ b/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpTracesConverter.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rest.protocol.otlp.v1;
+
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+import org.apache.iotdb.db.protocol.session.IClientSession;
+import org.apache.iotdb.rest.protocol.otlp.v1.OtlpIngestor.OtlpTableBatch;
+
+import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
+import io.opentelemetry.proto.trace.v1.ResourceSpans;
+import io.opentelemetry.proto.trace.v1.ScopeSpans;
+import io.opentelemetry.proto.trace.v1.Span;
+import org.apache.tsfile.enums.TSDataType;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Flattens OTLP trace export requests into rows for each service's {@code traces} table. */
+final class OtlpTracesConverter {
+
+ private static final String TABLE = "traces";
+
+ // TAG(2) + ATTRIBUTE(3) + FIELD(12) = 17 columns
+ private static final String[] COLUMN_NAMES = {
+ "service_name",
+ "span_name",
+ "service_version",
+ "os_type",
+ "host_arch",
+ "trace_id",
+ "span_id",
+ "parent_span_id",
+ "span_kind",
+ "start_time_unix_nano",
+ "end_time_unix_nano",
+ "duration_nano",
+ "status_code",
+ "status_message",
+ "attributes",
+ "resource_attributes",
+ "scope_name",
+ "scope_version"
+ };
+ private static final TSDataType[] DATA_TYPES = {
+ TSDataType.STRING,
+ TSDataType.STRING,
+ TSDataType.STRING,
+ TSDataType.STRING,
+ TSDataType.STRING,
+ TSDataType.STRING,
+ TSDataType.STRING,
+ TSDataType.STRING,
+ TSDataType.STRING,
+ TSDataType.INT64,
+ TSDataType.INT64,
+ TSDataType.INT64,
+ TSDataType.STRING,
+ TSDataType.STRING,
+ TSDataType.STRING,
+ TSDataType.STRING,
+ TSDataType.STRING,
+ TSDataType.STRING
+ };
+ private static final TsTableColumnCategory[] COLUMN_CATEGORIES = {
+ TsTableColumnCategory.TAG,
+ TsTableColumnCategory.TAG,
+ TsTableColumnCategory.ATTRIBUTE,
+ TsTableColumnCategory.ATTRIBUTE,
+ TsTableColumnCategory.ATTRIBUTE,
+ TsTableColumnCategory.FIELD,
+ TsTableColumnCategory.FIELD,
+ TsTableColumnCategory.FIELD,
+ TsTableColumnCategory.FIELD,
+ TsTableColumnCategory.FIELD,
+ TsTableColumnCategory.FIELD,
+ TsTableColumnCategory.FIELD,
+ TsTableColumnCategory.FIELD,
+ TsTableColumnCategory.FIELD,
+ TsTableColumnCategory.FIELD,
+ TsTableColumnCategory.FIELD,
+ TsTableColumnCategory.FIELD,
+ TsTableColumnCategory.FIELD
+ };
+
+ private OtlpTracesConverter() {}
+
+ static boolean convertAndInsert(
+ final OtlpService service, final ExportTraceServiceRequest request) {
+ final Map> byDatabase = new HashMap<>();
+ for (final ResourceSpans rs : request.getResourceSpansList()) {
+ final String db =
+ OtlpConverter.deriveDatabaseName(
+ OtlpConverter.extractServiceName(rs.getResource().getAttributesList()));
+ byDatabase.computeIfAbsent(db, k -> new ArrayList<>()).add(rs);
+ }
+
+ boolean allOk = true;
+ for (final Map.Entry> entry : byDatabase.entrySet()) {
+ if (!insertForDatabase(service, entry.getKey(), entry.getValue())) {
+ allOk = false;
+ }
+ }
+ return allOk;
+ }
+
+ private static boolean insertForDatabase(
+ final OtlpService service, final String database, final List resourceSpans) {
+ int capacity = 0;
+ for (final ResourceSpans rs : resourceSpans) {
+ for (final ScopeSpans ss : rs.getScopeSpansList()) {
+ capacity += ss.getSpansCount();
+ }
+ }
+ if (capacity == 0) {
+ return true;
+ }
+ final IClientSession session = service.sessionFor(database);
+ final OtlpTableBatch batch =
+ new OtlpTableBatch(TABLE, COLUMN_NAMES, DATA_TYPES, COLUMN_CATEGORIES, capacity);
+
+ for (final ResourceSpans rs : resourceSpans) {
+ final java.util.List resAttrs =
+ rs.getResource().getAttributesList();
+ final String serviceName = OtlpConverter.extractServiceName(resAttrs);
+ final String serviceVersion = OtlpConverter.extractAttribute(resAttrs, "service.version");
+ final String osType = OtlpConverter.extractAttribute(resAttrs, "os.type");
+ final String hostArch = OtlpConverter.extractAttribute(resAttrs, "host.arch");
+ final String resourceAttrsJson = OtlpConverter.attributesToJson(resAttrs);
+ for (final ScopeSpans ss : rs.getScopeSpansList()) {
+ final String scopeName = ss.getScope().getName();
+ final String scopeVersion = ss.getScope().getVersion();
+ for (final Span span : ss.getSpansList()) {
+ final long startNano = span.getStartTimeUnixNano();
+ final long endNano = span.getEndTimeUnixNano();
+ batch.startRow(OtlpConverter.nanoToDbPrecision(startNano));
+ int c = 0;
+ // TAGs
+ batch.setString(c++, serviceName);
+ batch.setString(c++, span.getName());
+ // ATTRIBUTEs
+ batch.setString(c++, serviceVersion);
+ batch.setString(c++, osType);
+ batch.setString(c++, hostArch);
+ // FIELDs
+ batch.setString(c++, OtlpConverter.bytesToHex(span.getTraceId()));
+ batch.setString(c++, OtlpConverter.bytesToHex(span.getSpanId()));
+ batch.setString(c++, OtlpConverter.bytesToHex(span.getParentSpanId()));
+ batch.setString(c++, span.getKind().name());
+ batch.setLong(c++, startNano);
+ batch.setLong(c++, endNano);
+ batch.setLong(c++, endNano - startNano);
+ batch.setString(c++, statusCode(span));
+ batch.setString(c++, span.getStatus().getMessage());
+ batch.setString(c++, OtlpConverter.attributesToJson(span.getAttributesList()));
+ batch.setString(c++, resourceAttrsJson);
+ batch.setString(c++, scopeName);
+ batch.setString(c, scopeVersion);
+ }
+ }
+ }
+ return OtlpIngestor.insert(database, session, batch);
+ }
+
+ private static String statusCode(final Span span) {
+ // Status is a top-level proto type in opentelemetry-proto, not nested in Span.
+ // OTLP status codes: STATUS_CODE_UNSET = 0, STATUS_CODE_OK = 1, STATUS_CODE_ERROR = 2.
+ if (!span.hasStatus()) {
+ return "UNSET";
+ }
+ final io.opentelemetry.proto.trace.v1.Status status = span.getStatus();
+ switch (status.getCode()) {
+ case STATUS_CODE_OK:
+ return "OK";
+ case STATUS_CODE_ERROR:
+ return "ERROR";
+ case STATUS_CODE_UNSET:
+ case UNRECOGNIZED:
+ default:
+ return "UNSET";
+ }
+ }
+}
diff --git a/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpTracesResource.java b/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpTracesResource.java
new file mode 100644
index 0000000000000..06e21051fe22a
--- /dev/null
+++ b/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/otlp/v1/OtlpTracesResource.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rest.protocol.otlp.v1;
+
+import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
+import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.Response;
+
+/**
+ * OTLP/HTTP traces endpoint. OpenTelemetry fixes the signal suffix to {@code /v1/traces}; IoTDB
+ * serves the base under {@code /rest/otlp} so the full URL is {@code /rest/otlp/v1/traces} and
+ * clients configure {@code OTEL_EXPORTER_OTLP_ENDPOINT=http://:18080/rest/otlp}.
+ */
+@Path("/rest/v1/otlp/v1/traces")
+public class OtlpTracesResource {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(OtlpTracesResource.class);
+
+ @POST
+ public Response export(@Context final HttpHeaders headers, final byte[] body) {
+ final boolean protobuf = OtlpHttp.isProtobuf(headers);
+ try {
+ final ExportTraceServiceRequest request =
+ OtlpHttp.parse(body, ExportTraceServiceRequest.newBuilder(), protobuf).build();
+ final boolean ok = OtlpService.getInstance().ingestTraces(request);
+ if (!ok) {
+ return OtlpHttp.partialFailure(protobuf, "OTLP trace insert failed");
+ }
+ return OtlpHttp.success(ExportTraceServiceResponse.getDefaultInstance(), protobuf);
+ } catch (final Exception e) {
+ LOGGER.warn("OTLP trace export failed", e);
+ return OtlpHttp.badRequest(protobuf, e.getMessage());
+ }
+ }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/rest/IoTDBRestServiceConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/rest/IoTDBRestServiceConfig.java
index 64c0f65fe3034..c720c3815274c 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/rest/IoTDBRestServiceConfig.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/rest/IoTDBRestServiceConfig.java
@@ -62,6 +62,12 @@ public class IoTDBRestServiceConfig {
/** Is client authentication required. */
private boolean clientAuth = false;
+ /** IoTDB user the OTLP receiver logs in as when writing telemetry data. */
+ private String otlpUsername = "root";
+
+ /** Password paired with {@link #otlpUsername}. */
+ private String otlpPassword = "root";
+
public boolean isClientAuth() {
return clientAuth;
}
@@ -173,4 +179,20 @@ public int getRestQueryDefaultRowSizeLimit() {
public void setRestQueryDefaultRowSizeLimit(int restQueryDefaultRowSizeLimit) {
this.restQueryDefaultRowSizeLimit = restQueryDefaultRowSizeLimit;
}
+
+ public String getOtlpUsername() {
+ return otlpUsername;
+ }
+
+ public void setOtlpUsername(String otlpUsername) {
+ this.otlpUsername = otlpUsername;
+ }
+
+ public String getOtlpPassword() {
+ return otlpPassword;
+ }
+
+ public void setOtlpPassword(String otlpPassword) {
+ this.otlpPassword = otlpPassword;
+ }
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/rest/IoTDBRestServiceDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/rest/IoTDBRestServiceDescriptor.java
index c4f9d131de958..3d9a602c690be 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/rest/IoTDBRestServiceDescriptor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/rest/IoTDBRestServiceDescriptor.java
@@ -111,6 +111,8 @@ private void loadProps(TrimProperties trimProperties) {
Integer.parseInt(
trimProperties.getProperty(
"idle_timeout_in_seconds", Integer.toString(conf.getIdleTimeoutInSeconds()))));
+ conf.setOtlpUsername(trimProperties.getProperty("otlp_username", conf.getOtlpUsername()));
+ conf.setOtlpPassword(trimProperties.getProperty("otlp_password", conf.getOtlpPassword()));
}
/**
diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
index 378a6226cbffd..7a7f3314aa047 100644
--- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
+++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
@@ -594,6 +594,33 @@ client_auth=false
# Datatype: int
idle_timeout_in_seconds=50000
+####################
+### OpenTelemetry (OTLP) Receiver configuration
+####################
+# The OTLP receiver is exposed under the REST service on port rest_service_port at base
+# path /rest/v1/otlp. It accepts OTLP/HTTP requests at:
+# POST /rest/v1/otlp/v1/traces
+# POST /rest/v1/otlp/v1/metrics
+# POST /rest/v1/otlp/v1/logs
+# Both application/x-protobuf (default OTLP encoding) and application/json are accepted.
+# The receiver is only active while enable_rest_service=true.
+#
+# Each OTLP request's resource attribute `service.name` is translated into a database name
+# (e.g. "claude-code" -> claude_code, "codex" -> codex, "Gemini CLI" -> gemini_cli; missing
+# service.name falls back to `unknown_service`). Database and tables are created on first
+# use of each distinct service.
+
+# IoTDB user the OTLP receiver logs in as when writing telemetry data.
+# effectiveMode: restart
+# Datatype: String
+otlp_username=root
+
+# Password paired with otlp_username. Leave the default if you are running an unchanged
+# root/root installation; override in production.
+# effectiveMode: restart
+# Datatype: String
+otlp_password=root
+
####################
### Load balancing configuration
####################