diff --git a/docs-mintlify/admin/connect-to-data/data-sources/ksqldb.mdx b/docs-mintlify/admin/connect-to-data/data-sources/ksqldb.mdx index c78f6d2210f83..cb482f0f2429f 100644 --- a/docs-mintlify/admin/connect-to-data/data-sources/ksqldb.mdx +++ b/docs-mintlify/admin/connect-to-data/data-sources/ksqldb.mdx @@ -332,6 +332,9 @@ cubes: - CUBE.status unique_key_columns: - order_id + - user_id + - status + - created_at_second time_dimension: CUBE.created_at granularity: second partition_granularity: day @@ -348,6 +351,7 @@ cubes: columns: - CUBE.user_id - CUBE.status + - orders__created_at_second stream_offset: latest output_column_types: - member: CUBE.order_id @@ -356,6 +360,8 @@ cubes: type: text - member: CUBE.status type: text + - member: CUBE.created_at.second + type: timestamp - member: CUBE.count type: int - member: CUBE.total_amount @@ -506,7 +512,12 @@ cube("order_events_stream", { read_only: true, measures: [CUBE.count, CUBE.total_amount, CUBE.failed_count], dimensions: [CUBE.order_id, CUBE.user_id, CUBE.status], - unique_key_columns: [`order_id`], + unique_key_columns: [ + `order_id`, + `user_id`, + `status`, + `created_at_second`, + ], time_dimension: CUBE.created_at, granularity: `second`, partition_granularity: `day`, @@ -523,7 +534,7 @@ cube("order_events_stream", { }, indexes: { user_status: { - columns: [CUBE.user_id, CUBE.status], + columns: [CUBE.user_id, CUBE.status, `orders__created_at_second`], }, }, stream_offset: `latest`, @@ -531,6 +542,7 @@ cube("order_events_stream", { { member: CUBE.order_id, type: `text` }, { member: CUBE.user_id, type: `text` }, { member: CUBE.status, type: `text` }, + { member: CUBE.created_at.second, type: `timestamp` }, { member: CUBE.count, type: `int` }, { member: CUBE.total_amount, type: `decimal` }, { member: CUBE.failed_count, type: `int` }, @@ -617,6 +629,29 @@ sequence column (`__seq`) to the table, populated from the Kafka partition offset. The unique key columns together with `__seq` form the sort key for all indexes on this table. +Entries in `unique_key_columns` are strings, not member references. Use +the dimension's own name (for example, `user_id`). To include the time +dimension as part of the unique key, use the form +`_` — for example, `created_at_second` +when `granularity` is `second`. + + + +The naming convention for the time dimension inside `unique_key_columns` +is **not** the same as the column name used in +[`indexes`](/reference/data-modeling/pre-aggregations#indexes). Indexes +expect the fully-qualified alias from the generated `select_statement` +(`___`), while +`unique_key_columns` expects only `_`. + + + +If the pre-aggregation defines `indexes`, every dimension referenced by +any index must also be listed in `unique_key_columns`. Cube Store uses +`unique_key_columns` (plus `__seq`) as the sort key for all indexes, so +an index column that is not part of the unique key cannot be sorted on +during compaction and the pre-aggregation build will fail. + Deduplication is not applied at ingestion time — all incoming records are appended as they arrive. Instead, Cube Store deduplicates during **reads** and **compaction**: rows are sorted by the unique key columns @@ -688,6 +723,8 @@ pre_aggregations: type: text - member: CUBE.status type: text + - member: CUBE.created_at.second + type: timestamp - member: CUBE.count type: int - member: CUBE.total_amount @@ -723,6 +760,7 @@ pre_aggregations: { { member: CUBE.order_id, type: `text` }, { member: CUBE.user_id, type: `text` }, { member: CUBE.status, type: `text` }, + { member: CUBE.created_at.second, type: `timestamp` }, { member: CUBE.count, type: `int` }, { member: CUBE.total_amount, type: `decimal` }, { member: CUBE.failed_count, type: `int` }, @@ -736,13 +774,16 @@ pre_aggregations: { Each entry in `output_column_types` has two properties: - `member` — a reference to a dimension or measure included in the - pre-aggregation. + pre-aggregation. This must be a `CUBE` member reference (for example, + `CUBE.user_id`), not a string. For the time dimension, reference it + with the rollup granularity attached — for example, + `CUBE.created_at.second` when `granularity` is `second`. - `type` — the Cube Store column type. Common values: `text`, `int`, `bigint`, `decimal`, `float`, `boolean`, `timestamp`. -The time dimension used in `time_dimension` does not need an entry in -`output_column_types` — its type is always `timestamp` and is set -automatically. +Include an entry for every dimension and measure in the pre-aggregation. +The time dimension must also be listed, with `type: timestamp` and the +granularity suffix on the member reference as shown above. When `output_column_types` is defined, Cube uses the aliased column names (matching the `select_statement`) for the Cube Store table @@ -823,6 +864,46 @@ expression chain to truncate timestamps to the configured granularity natively on each micro-batch during ingestion. Standard SQL functions like `date_trunc` are also available in the `select_statement`. +##### Converting `bigint` timestamps + +When a source column stores a timestamp as a `bigint` (a common pattern +in Kafka topics — for example, a `created_at` field with values like +`1778800167128`), the value must be converted to a timestamp before it +can be used as a time dimension. + +The conversion depends on the unit of the `bigint` value: + +- **Milliseconds** (most common, e.g. `Date.now()` in JavaScript, Java's + `System.currentTimeMillis()`) — multiply by `1000` before casting, + because `CAST(... AS TIMESTAMP(6))` interprets numeric input as + **microseconds**. Without the multiplication, a millisecond value like + `1778800167128` is read as microseconds and produces a timestamp in + 1970 instead of the intended date. +- **Microseconds** — cast directly to `TIMESTAMP(6)`. +- **Seconds** — multiply by `1000000` before casting. + +Apply the conversion in the time dimension's `sql` property: + +```javascript +dimensions: { + created_at: { + sql: `CAST(${CUBE}.created_at * 1000 AS TIMESTAMP(6))`, + type: `time` + } +} +``` + +The cast runs first; any granularity truncation generated by Cube +(`PARSE_TIMESTAMP(FORMAT_TIMESTAMP(CONVERT_TZ(...)))`) then operates on +the resulting `TIMESTAMP(6)` value. + + + When debugging a `bigint` timestamp issue, temporarily remove + `partition_granularity` from the pre-aggregation. This skips + partitioned builds and makes it easier to verify the conversion is + correct before re-enabling partitioning. + + ### Filtering on the stream When the streaming cube defines a `sql` property with a `SELECT`