Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 87 additions & 6 deletions docs-mintlify/admin/connect-to-data/data-sources/ksqldb.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,9 @@ cubes:
- CUBE.status
unique_key_columns:
- order_id
- user_id
- status
- created_at_second
time_dimension: CUBE.created_at
granularity: second
partition_granularity: day
Expand All @@ -348,6 +351,7 @@ cubes:
columns:
- CUBE.user_id
- CUBE.status
- orders__created_at_second
stream_offset: latest
output_column_types:
- member: CUBE.order_id
Expand All @@ -356,6 +360,8 @@ cubes:
type: text
- member: CUBE.status
type: text
- member: CUBE.created_at.second
type: timestamp
- member: CUBE.count
type: int
- member: CUBE.total_amount
Expand Down Expand Up @@ -506,7 +512,12 @@ cube("order_events_stream", {
read_only: true,
measures: [CUBE.count, CUBE.total_amount, CUBE.failed_count],
dimensions: [CUBE.order_id, CUBE.user_id, CUBE.status],
unique_key_columns: [`order_id`],
unique_key_columns: [
`order_id`,
`user_id`,
`status`,
`created_at_second`,
],
time_dimension: CUBE.created_at,
granularity: `second`,
partition_granularity: `day`,
Expand All @@ -523,14 +534,15 @@ cube("order_events_stream", {
},
indexes: {
user_status: {
columns: [CUBE.user_id, CUBE.status],
columns: [CUBE.user_id, CUBE.status, `orders__created_at_second`],
},
},
stream_offset: `latest`,
output_column_types: [
{ member: CUBE.order_id, type: `text` },
{ member: CUBE.user_id, type: `text` },
{ member: CUBE.status, type: `text` },
{ member: CUBE.created_at.second, type: `timestamp` },
{ member: CUBE.count, type: `int` },
{ member: CUBE.total_amount, type: `decimal` },
{ member: CUBE.failed_count, type: `int` },
Expand Down Expand Up @@ -617,6 +629,29 @@ sequence column (`__seq`) to the table, populated from the Kafka
partition offset. The unique key columns together with `__seq` form the
sort key for all indexes on this table.

Entries in `unique_key_columns` are strings, not member references. Use
the dimension's own name (for example, `user_id`). To include the time
dimension as part of the unique key, use the form
`<time_dimension_name>_<granularity>` — for example, `created_at_second`
when `granularity` is `second`.

<Warning>

The naming convention for the time dimension inside `unique_key_columns`
is **not** the same as the column name used in
[`indexes`](/reference/data-modeling/pre-aggregations#indexes). Indexes
expect the fully-qualified alias from the generated `select_statement`
(`<cube_name>__<time_dimension_name>_<granularity>`), while
`unique_key_columns` expects only `<time_dimension_name>_<granularity>`.

</Warning>

If the pre-aggregation defines `indexes`, every dimension referenced by
any index must also be listed in `unique_key_columns`. Cube Store uses
`unique_key_columns` (plus `__seq`) as the sort key for all indexes, so
an index column that is not part of the unique key cannot be sorted on
during compaction and the pre-aggregation build will fail.

Deduplication is not applied at ingestion time — all incoming records are
appended as they arrive. Instead, Cube Store deduplicates during
**reads** and **compaction**: rows are sorted by the unique key columns
Expand Down Expand Up @@ -688,6 +723,8 @@ pre_aggregations:
type: text
- member: CUBE.status
type: text
- member: CUBE.created_at.second
type: timestamp
- member: CUBE.count
type: int
- member: CUBE.total_amount
Expand Down Expand Up @@ -723,6 +760,7 @@ pre_aggregations: {
{ member: CUBE.order_id, type: `text` },
{ member: CUBE.user_id, type: `text` },
{ member: CUBE.status, type: `text` },
{ member: CUBE.created_at.second, type: `timestamp` },
{ member: CUBE.count, type: `int` },
{ member: CUBE.total_amount, type: `decimal` },
{ member: CUBE.failed_count, type: `int` },
Expand All @@ -736,13 +774,16 @@ pre_aggregations: {
Each entry in `output_column_types` has two properties:

- `member` — a reference to a dimension or measure included in the
pre-aggregation.
pre-aggregation. This must be a `CUBE` member reference (for example,
`CUBE.user_id`), not a string. For the time dimension, reference it
with the rollup granularity attached — for example,
`CUBE.created_at.second` when `granularity` is `second`.
- `type` — the Cube Store column type. Common values: `text`, `int`,
`bigint`, `decimal`, `float`, `boolean`, `timestamp`.

The time dimension used in `time_dimension` does not need an entry in
`output_column_types` — its type is always `timestamp` and is set
automatically.
Include an entry for every dimension and measure in the pre-aggregation.
The time dimension must also be listed, with `type: timestamp` and the
granularity suffix on the member reference as shown above.

When `output_column_types` is defined, Cube uses the aliased column
names (matching the `select_statement`) for the Cube Store table
Expand Down Expand Up @@ -823,6 +864,46 @@ expression chain to truncate timestamps to the configured granularity
natively on each micro-batch during ingestion. Standard SQL functions
like `date_trunc` are also available in the `select_statement`.

##### Converting `bigint` timestamps

When a source column stores a timestamp as a `bigint` (a common pattern
in Kafka topics — for example, a `created_at` field with values like
`1778800167128`), the value must be converted to a timestamp before it
can be used as a time dimension.

The conversion depends on the unit of the `bigint` value:

- **Milliseconds** (most common, e.g. `Date.now()` in JavaScript, Java's
`System.currentTimeMillis()`) — multiply by `1000` before casting,
because `CAST(... AS TIMESTAMP(6))` interprets numeric input as
**microseconds**. Without the multiplication, a millisecond value like
`1778800167128` is read as microseconds and produces a timestamp in
1970 instead of the intended date.
- **Microseconds** — cast directly to `TIMESTAMP(6)`.
- **Seconds** — multiply by `1000000` before casting.

Apply the conversion in the time dimension's `sql` property:

```javascript
dimensions: {
created_at: {
sql: `CAST(${CUBE}.created_at * 1000 AS TIMESTAMP(6))`,
type: `time`
}
}
```

The cast runs first; any granularity truncation generated by Cube
(`PARSE_TIMESTAMP(FORMAT_TIMESTAMP(CONVERT_TZ(...)))`) then operates on
the resulting `TIMESTAMP(6)` value.

<Tip>
When debugging a `bigint` timestamp issue, temporarily remove
`partition_granularity` from the pre-aggregation. This skips
partitioned builds and makes it easier to verify the conversion is
correct before re-enabling partitioning.
</Tip>

### Filtering on the stream

When the streaming cube defines a `sql` property with a `SELECT`
Expand Down
Loading