diff --git a/biome.json b/biome.json index 1cfbd09a..e55fa69d 100644 --- a/biome.json +++ b/biome.json @@ -7,7 +7,7 @@ "@lokalise/biome-config/configs/biome-package.jsonc" ], "files": { - "includes": ["**", "!.turbo", "!**/dist", "!**/coverage"] + "includes": ["**", "!.turbo", "!**/dist", "!**/coverage", "!**/load-tests"] }, "linter": { "rules": { diff --git a/packages/kafka/load-tests/.gitattributes b/packages/kafka/load-tests/.gitattributes new file mode 100644 index 00000000..0309e02d --- /dev/null +++ b/packages/kafka/load-tests/.gitattributes @@ -0,0 +1 @@ +scripts/*.sh text eol=lf diff --git a/packages/kafka/load-tests/README.md b/packages/kafka/load-tests/README.md new file mode 100644 index 00000000..63faa333 --- /dev/null +++ b/packages/kafka/load-tests/README.md @@ -0,0 +1,114 @@ +# Kafka Load Tests + +Load tests for `@message-queue-toolkit/kafka` measuring throughput, latency, and backlog under configurable load. + +Two producer modes and two consumer modes can be combined: + +- **CDC**: CockroachDB inserts → CDC changefeed → Kafka (realistic end-to-end) +- **Direct**: `AbstractKafkaPublisher` → Kafka (isolates Kafka consumer performance) +- **Single**: One message at a time (`batchProcessingEnabled: false`) +- **Batch**: Batched consumption via `KafkaMessageBatchStream` (`batchProcessingEnabled: true`) + +## Prerequisites + +- Docker & Docker Compose +- Node.js >= 22.14.0 + +## Quick Start + +```bash +# Install dependencies +npm install + +# Start infrastructure (Kafka + CockroachDB + CDC changefeed) +npm run docker:start + +# Stop and clean up +npm run docker:stop +``` + +## Test Scripts + +### CDC (CockroachDB → CDC → Kafka) + +```bash +# Single-message consumer +npm run load:cdc:light # 100 rows/sec, 30s +npm run load:cdc:medium # 1000 rows/sec, 60s +npm run load:cdc:heavy # 5000 rows/sec, 120s +npm run load:cdc -- --rate 500 --duration 45 --batch 50 + +# Batch consumer +npm run load:cdc:batch:light # 100 rows/sec, 30s +npm run load:cdc:batch:medium # 1000 rows/sec, 60s +npm run load:cdc:batch:heavy # 5000 rows/sec, 120s +npm run load:cdc:batch -- --rate 500 --consumer-batch 100 --consumer-timeout 500 +``` + +### Direct (Kafka publisher → Kafka) + +```bash +# Single-message consumer +npm run load:direct:light # 100 msgs/sec, 30s +npm run load:direct:medium # 1000 msgs/sec, 60s +npm run load:direct:heavy # 5000 msgs/sec, 120s +npm run load:direct -- --rate 500 --duration 45 --batch 50 + +# Batch consumer +npm run load:direct:batch:light # 100 msgs/sec, 30s +npm run load:direct:batch:medium # 1000 msgs/sec, 60s +npm run load:direct:batch:heavy # 5000 msgs/sec, 120s +npm run load:direct:batch -- --rate 500 --consumer-batch 100 --consumer-timeout 500 +``` + +## CLI Options + +### Common + +| Flag | Short | Default | Description | +|------|-------|---------|-------------| +| `--rate` | `-r` | 1000 | Target produce rate (rows or msgs per sec) | +| `--duration` | `-d` | 60 | Test duration (seconds) | +| `--batch` | `-b` | 100 | Producer batch size | + +### Batch consumer only + +| Flag | Default | Description | +|------|---------|-------------| +| `--consumer-batch` | 50 | Messages per consumer batch | +| `--consumer-timeout` | 200 | Batch flush timeout (ms) | + +## Architecture + +### CDC mode + +``` +Load Generator → CockroachDB (inserts) → CDC Changefeed → Kafka → Consumer → Metrics +``` + +1. **CockroachDB** tables (`events`, `orders`) with CDC changefeed targeting Kafka +2. **Load generator** inserts rows into CRDB at configurable rate with fire-and-forget concurrency +3. **CDC changefeed** publishes row changes to Kafka topics +4. **Consumer** (single or batch) processes messages and records metrics + +### Direct mode + +``` +Load Generator → AbstractKafkaPublisher → Kafka → Consumer → Metrics +``` + +1. **Publisher** sends messages directly to Kafka topics (`direct-events`, `direct-orders`) +2. **Consumer** (single or batch) processes messages and records metrics + +## Services + +| Service | Port | Description | +|---------|------|-------------| +| Kafka | 9092 | Message broker (KRaft, 6 partitions) | +| CockroachDB | 26257 | SQL database | +| CockroachDB UI | 8181 | DB admin console | +| Kafka UI | 8080 | Topic browser | + +## Latency Measurement + +Each `events` row embeds `{"loadtest_ts": }` in its payload. The consumer extracts this timestamp and computes end-to-end latency (insert/publish → consume). Reported as avg, p50, p95, p99. diff --git a/packages/kafka/load-tests/docker-compose.yml b/packages/kafka/load-tests/docker-compose.yml new file mode 100644 index 00000000..b6df59b4 --- /dev/null +++ b/packages/kafka/load-tests/docker-compose.yml @@ -0,0 +1,67 @@ +services: + + kafka: + image: apache/kafka:3.7.1 + ports: + - 9092:9092 + environment: + KAFKA_NODE_ID: 1 + KAFKA_PROCESS_ROLES: broker,controller + KAFKA_LISTENERS: LOCAL://0.0.0.0:9092,DOCKER://kafka:9093,CONTROLLER://localhost:9094 + KAFKA_ADVERTISED_LISTENERS: LOCAL://localhost:9092,DOCKER://kafka:9093 + KAFKA_INTER_BROKER_LISTENER_NAME: LOCAL + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,LOCAL:PLAINTEXT,DOCKER:PLAINTEXT + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9094 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_NUM_PARTITIONS: 6 + healthcheck: + test: /opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list + interval: 5s + timeout: 10s + retries: 10 + start_period: 15s + restart: on-failure + + cockroachdb: + image: cockroachdb/cockroach:latest-v25.1 + command: start-single-node --insecure + ports: + - 26257:26257 + - 8181:8080 + depends_on: + kafka: + condition: service_healthy + healthcheck: + test: curl -f http://localhost:8080/health?ready=1 || exit 1 + interval: 5s + timeout: 5s + retries: 10 + start_period: 10s + restart: on-failure + + crdb-init: + image: cockroachdb/cockroach:latest-v25.1 + volumes: + - ./scripts/init-crdb.sh:/init-crdb.sh + entrypoint: ["/bin/bash", "/init-crdb.sh"] + depends_on: + cockroachdb: + condition: service_healthy + restart: "no" + + kafka-ui: + image: provectuslabs/kafka-ui:latest + ports: + - 8080:8080 + depends_on: + kafka: + condition: service_healthy + environment: + DYNAMIC_CONFIG_ENABLED: 'true' + KAFKA_CLUSTERS_0_NAME: LoadTest + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9093 + restart: on-failure diff --git a/packages/kafka/load-tests/package-lock.json b/packages/kafka/load-tests/package-lock.json new file mode 100644 index 00000000..0b713db7 --- /dev/null +++ b/packages/kafka/load-tests/package-lock.json @@ -0,0 +1,1207 @@ +{ + "name": "@message-queue-toolkit/kafka-load-tests", + "version": "0.0.0", + "lockfileVersion": 3, + "requires": true, + "packages": { + "": { + "name": "@message-queue-toolkit/kafka-load-tests", + "version": "0.0.0", + "dependencies": { + "@lokalise/node-core": "^14.2.0", + "@message-queue-toolkit/core": "file:../../core", + "@message-queue-toolkit/kafka": "file:../../kafka", + "@message-queue-toolkit/schemas": "file:../../schemas", + "@platformatic/kafka": "1.28.0", + "pg": "^8.19.0", + "zod": "^4.0.17" + }, + "devDependencies": { + "@types/pg": "^8.16.0", + "typescript": "^5.9.3" + } + }, + "..": { + "name": "@message-queue-toolkit/kafka", + "version": "0.9.1", + "license": "MIT", + "dependencies": { + "@lokalise/node-core": "^14.2.0", + "@lokalise/universal-ts-utils": "^4.5.1", + "@platformatic/kafka": "^1.26.0" + }, + "devDependencies": { + "@biomejs/biome": "^2.3.2", + "@lokalise/biome-config": "^3.1.0", + "@lokalise/tsconfig": "^3.0.0", + "@message-queue-toolkit/core": ">=23.0.0", + "@message-queue-toolkit/schemas": ">=7.0.0", + "@types/node": "^25.0.2", + "@vitest/coverage-v8": "^4.0.15", + "awilix": "^12.0.1", + "awilix-manager": "^6.0.0", + "rimraf": "^6.0.1", + "typescript": "^5.9.2", + "vitest": "^4.0.15", + "zod": "^4.0.17" + }, + "engines": { + "node": ">= 22.14.0" + }, + "peerDependencies": { + "@message-queue-toolkit/core": ">=23.0.0", + "@message-queue-toolkit/schemas": ">=7.0.0", + "zod": ">=3.25.76 <5.0.0" + } + }, + "../../core": { + "name": "@message-queue-toolkit/core", + "version": "25.3.0", + "license": "MIT", + "dependencies": { + "@lokalise/node-core": "^14.2.0", + "@lokalise/universal-ts-utils": "^4.5.1", + "@message-queue-toolkit/schemas": "^7.0.0", + "dot-prop": "^10.1.0", + "fast-equals": "^6.0.0", + "json-stream-stringify": "^3.1.6", + "tmp": "^0.2.3", + "toad-cache": "^3.7.0" + }, + "devDependencies": { + "@biomejs/biome": "^2.3.8", + "@lokalise/biome-config": "^3.1.0", + "@lokalise/tsconfig": "^3.0.0", + "@types/node": "^25.0.2", + "@types/tmp": "^0.2.6", + "@vitest/coverage-v8": "^4.0.15", + "awilix": "^12.0.5", + "awilix-manager": "^6.1.0", + "rimraf": "^6.0.1", + "typescript": "^5.9.2", + "vitest": "^4.0.15", + "zod": "^4.1.13" + }, + "peerDependencies": { + "zod": ">=3.25.76 <5.0.0" + } + }, + "../../schemas": { + "name": "@message-queue-toolkit/schemas", + "version": "7.1.0", + "license": "MIT", + "devDependencies": { + "@biomejs/biome": "^2.3.2", + "@lokalise/biome-config": "^3.1.0", + "@lokalise/tsconfig": "^3.0.0", + "@types/node": "^25.0.2", + "@vitest/coverage-v8": "^4.0.15", + "rimraf": "^6.0.1", + "typescript": "^5.9.2", + "vitest": "^4.0.15", + "zod": "^4.0.17" + }, + "peerDependencies": { + "zod": ">=3.25.67" + } + }, + "node_modules/@emnapi/core": { + "version": "1.8.1", + "resolved": "https://registry.npmjs.org/@emnapi/core/-/core-1.8.1.tgz", + "integrity": "sha512-AvT9QFpxK0Zd8J0jopedNm+w/2fIzvtPKPjqyw9jwvBaReTTqPBk9Hixaz7KbjimP+QNz605/XnjFcDAL2pqBg==", + "license": "MIT", + "optional": true, + "dependencies": { + "@emnapi/wasi-threads": "1.1.0", + "tslib": "^2.4.0" + } + }, + "node_modules/@emnapi/runtime": { + "version": "1.8.1", + "resolved": "https://registry.npmjs.org/@emnapi/runtime/-/runtime-1.8.1.tgz", + "integrity": "sha512-mehfKSMWjjNol8659Z8KxEMrdSJDDot5SXMq00dM8BN4o+CLNXQ0xH2V7EchNHV4RmbZLmmPdEaXZc5H2FXmDg==", + "license": "MIT", + "optional": true, + "dependencies": { + "tslib": "^2.4.0" + } + }, + "node_modules/@emnapi/wasi-threads": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/@emnapi/wasi-threads/-/wasi-threads-1.1.0.tgz", + "integrity": "sha512-WI0DdZ8xFSbgMjR1sFsKABJ/C5OnRrjT06JXbZKexJGrDuPTzZdDYfFlsgcCXCyf+suG5QU2e/y1Wo2V/OapLQ==", + "license": "MIT", + "optional": true, + "dependencies": { + "tslib": "^2.4.0" + } + }, + "node_modules/@lokalise/node-core": { + "version": "14.7.4", + "resolved": "https://registry.npmjs.org/@lokalise/node-core/-/node-core-14.7.4.tgz", + "integrity": "sha512-ZBlqh25P4YHgHGTc8iON6FtrLwTyy0MbIu7BmiN4ClZI2M92ssvnIUf2PHHZALLHN9w3yrs67JyHw0X+yiam2Q==", + "license": "Apache-2.0", + "dependencies": { + "dot-prop": "6.0.1", + "pino": "^10.0.0", + "pino-pretty": "^13.1.1", + "tslib": "^2.8.1" + }, + "peerDependencies": { + "zod": ">=3.25.76 <5.0.0" + } + }, + "node_modules/@message-queue-toolkit/core": { + "resolved": "../../core", + "link": true + }, + "node_modules/@message-queue-toolkit/kafka": { + "resolved": "..", + "link": true + }, + "node_modules/@message-queue-toolkit/schemas": { + "resolved": "../../schemas", + "link": true + }, + "node_modules/@napi-rs/wasm-runtime": { + "version": "0.2.12", + "resolved": "https://registry.npmjs.org/@napi-rs/wasm-runtime/-/wasm-runtime-0.2.12.tgz", + "integrity": "sha512-ZVWUcfwY4E/yPitQJl481FjFo3K22D6qF0DuFH6Y/nbnE11GY5uguDxZMGXPQ8WQ0128MXQD7TnfHyK4oWoIJQ==", + "license": "MIT", + "optional": true, + "dependencies": { + "@emnapi/core": "^1.4.3", + "@emnapi/runtime": "^1.4.3", + "@tybys/wasm-util": "^0.10.0" + } + }, + "node_modules/@node-rs/crc32": { + "version": "1.10.6", + "resolved": "https://registry.npmjs.org/@node-rs/crc32/-/crc32-1.10.6.tgz", + "integrity": "sha512-+llXfqt+UzgoDzT9of5vPQPGqTAVCohU74I9zIBkNo5TH6s2P31DFJOGsJQKN207f0GHnYv5pV3wh3BCY/un/A==", + "license": "MIT", + "optional": true, + "engines": { + "node": ">= 10" + }, + "funding": { + "type": "github", + "url": "https://github.com/sponsors/Brooooooklyn" + }, + "optionalDependencies": { + "@node-rs/crc32-android-arm-eabi": "1.10.6", + "@node-rs/crc32-android-arm64": "1.10.6", + "@node-rs/crc32-darwin-arm64": "1.10.6", + "@node-rs/crc32-darwin-x64": "1.10.6", + "@node-rs/crc32-freebsd-x64": "1.10.6", + "@node-rs/crc32-linux-arm-gnueabihf": "1.10.6", + "@node-rs/crc32-linux-arm64-gnu": "1.10.6", + "@node-rs/crc32-linux-arm64-musl": "1.10.6", + "@node-rs/crc32-linux-x64-gnu": "1.10.6", + "@node-rs/crc32-linux-x64-musl": "1.10.6", + "@node-rs/crc32-wasm32-wasi": "1.10.6", + "@node-rs/crc32-win32-arm64-msvc": "1.10.6", + "@node-rs/crc32-win32-ia32-msvc": "1.10.6", + "@node-rs/crc32-win32-x64-msvc": "1.10.6" + } + }, + "node_modules/@node-rs/crc32-android-arm-eabi": { + "version": "1.10.6", + "resolved": "https://registry.npmjs.org/@node-rs/crc32-android-arm-eabi/-/crc32-android-arm-eabi-1.10.6.tgz", + "integrity": "sha512-vZAMuJXm3TpWPOkkhxdrofWDv+Q+I2oO7ucLRbXyAPmXFNDhHtBxbO1rk9Qzz+M3eep8ieS4/+jCL1Q0zacNMQ==", + "cpu": [ + "arm" + ], + "license": "MIT", + "optional": true, + "os": [ + "android" + ], + "engines": { + "node": ">= 10" + } + }, + "node_modules/@node-rs/crc32-android-arm64": { + "version": "1.10.6", + "resolved": "https://registry.npmjs.org/@node-rs/crc32-android-arm64/-/crc32-android-arm64-1.10.6.tgz", + "integrity": "sha512-Vl/JbjCinCw/H9gEpZveWCMjxjcEChDcDBM8S4hKay5yyoRCUHJPuKr4sjVDBeOm+1nwU3oOm6Ca8dyblwp4/w==", + "cpu": [ + "arm64" + ], + "license": "MIT", + "optional": true, + "os": [ + "android" + ], + "engines": { + "node": ">= 10" + } + }, + "node_modules/@node-rs/crc32-darwin-arm64": { + "version": "1.10.6", + "resolved": "https://registry.npmjs.org/@node-rs/crc32-darwin-arm64/-/crc32-darwin-arm64-1.10.6.tgz", + "integrity": "sha512-kARYANp5GnmsQiViA5Qu74weYQ3phOHSYQf0G+U5wB3NB5JmBHnZcOc46Ig21tTypWtdv7u63TaltJQE41noyg==", + "cpu": [ + "arm64" + ], + "license": "MIT", + "optional": true, + "os": [ + "darwin" + ], + "engines": { + "node": ">= 10" + } + }, + "node_modules/@node-rs/crc32-darwin-x64": { + "version": "1.10.6", + "resolved": "https://registry.npmjs.org/@node-rs/crc32-darwin-x64/-/crc32-darwin-x64-1.10.6.tgz", + "integrity": "sha512-Q99bevJVMfLTISpkpKBlXgtPUItrvTWKFyiqoKH5IvscZmLV++NH4V13Pa17GTBmv9n18OwzgQY4/SRq6PQNVA==", + "cpu": [ + "x64" + ], + "license": "MIT", + "optional": true, + "os": [ + "darwin" + ], + "engines": { + "node": ">= 10" + } + }, + "node_modules/@node-rs/crc32-freebsd-x64": { + "version": "1.10.6", + "resolved": "https://registry.npmjs.org/@node-rs/crc32-freebsd-x64/-/crc32-freebsd-x64-1.10.6.tgz", + "integrity": "sha512-66hpawbNjrgnS9EDMErta/lpaqOMrL6a6ee+nlI2viduVOmRZWm9Rg9XdGTK/+c4bQLdtC6jOd+Kp4EyGRYkAg==", + "cpu": [ + "x64" + ], + "license": "MIT", + "optional": true, + "os": [ + "freebsd" + ], + "engines": { + "node": ">= 10" + } + }, + "node_modules/@node-rs/crc32-linux-arm-gnueabihf": { + "version": "1.10.6", + "resolved": "https://registry.npmjs.org/@node-rs/crc32-linux-arm-gnueabihf/-/crc32-linux-arm-gnueabihf-1.10.6.tgz", + "integrity": "sha512-E8Z0WChH7X6ankbVm8J/Yym19Cq3otx6l4NFPS6JW/cWdjv7iw+Sps2huSug+TBprjbcEA+s4TvEwfDI1KScjg==", + "cpu": [ + "arm" + ], + "license": "MIT", + "optional": true, + "os": [ + "linux" + ], + "engines": { + "node": ">= 10" + } + }, + "node_modules/@node-rs/crc32-linux-arm64-gnu": { + "version": "1.10.6", + "resolved": "https://registry.npmjs.org/@node-rs/crc32-linux-arm64-gnu/-/crc32-linux-arm64-gnu-1.10.6.tgz", + "integrity": "sha512-LmWcfDbqAvypX0bQjQVPmQGazh4dLiVklkgHxpV4P0TcQ1DT86H/SWpMBMs/ncF8DGuCQ05cNyMv1iddUDugoQ==", + "cpu": [ + "arm64" + ], + "license": "MIT", + "optional": true, + "os": [ + "linux" + ], + "engines": { + "node": ">= 10" + } + }, + "node_modules/@node-rs/crc32-linux-arm64-musl": { + "version": "1.10.6", + "resolved": "https://registry.npmjs.org/@node-rs/crc32-linux-arm64-musl/-/crc32-linux-arm64-musl-1.10.6.tgz", + "integrity": "sha512-k8ra/bmg0hwRrIEE8JL1p32WfaN9gDlUUpQRWsbxd1WhjqvXea7kKO6K4DwVxyxlPhBS9Gkb5Urq7Y4mXANzaw==", + "cpu": [ + "arm64" + ], + "license": "MIT", + "optional": true, + "os": [ + "linux" + ], + "engines": { + "node": ">= 10" + } + }, + "node_modules/@node-rs/crc32-linux-x64-gnu": { + "version": "1.10.6", + "resolved": "https://registry.npmjs.org/@node-rs/crc32-linux-x64-gnu/-/crc32-linux-x64-gnu-1.10.6.tgz", + "integrity": "sha512-IfjtqcuFK7JrSZ9mlAFhb83xgium30PguvRjIMI45C3FJwu18bnLk1oR619IYb/zetQT82MObgmqfKOtgemEKw==", + "cpu": [ + "x64" + ], + "license": "MIT", + "optional": true, + "os": [ + "linux" + ], + "engines": { + "node": ">= 10" + } + }, + "node_modules/@node-rs/crc32-linux-x64-musl": { + "version": "1.10.6", + "resolved": "https://registry.npmjs.org/@node-rs/crc32-linux-x64-musl/-/crc32-linux-x64-musl-1.10.6.tgz", + "integrity": "sha512-LbFYsA5M9pNunOweSt6uhxenYQF94v3bHDAQRPTQ3rnjn+mK6IC7YTAYoBjvoJP8lVzcvk9hRj8wp4Jyh6Y80g==", + "cpu": [ + "x64" + ], + "license": "MIT", + "optional": true, + "os": [ + "linux" + ], + "engines": { + "node": ">= 10" + } + }, + "node_modules/@node-rs/crc32-wasm32-wasi": { + "version": "1.10.6", + "resolved": "https://registry.npmjs.org/@node-rs/crc32-wasm32-wasi/-/crc32-wasm32-wasi-1.10.6.tgz", + "integrity": "sha512-KaejdLgHMPsRaxnM+OG9L9XdWL2TabNx80HLdsCOoX9BVhEkfh39OeahBo8lBmidylKbLGMQoGfIKDjq0YMStw==", + "cpu": [ + "wasm32" + ], + "license": "MIT", + "optional": true, + "dependencies": { + "@napi-rs/wasm-runtime": "^0.2.5" + }, + "engines": { + "node": ">=14.0.0" + } + }, + "node_modules/@node-rs/crc32-win32-arm64-msvc": { + "version": "1.10.6", + "resolved": "https://registry.npmjs.org/@node-rs/crc32-win32-arm64-msvc/-/crc32-win32-arm64-msvc-1.10.6.tgz", + "integrity": "sha512-x50AXiSxn5Ccn+dCjLf1T7ZpdBiV1Sp5aC+H2ijhJO4alwznvXgWbopPRVhbp2nj0i+Gb6kkDUEyU+508KAdGQ==", + "cpu": [ + "arm64" + ], + "license": "MIT", + "optional": true, + "os": [ + "win32" + ], + "engines": { + "node": ">= 10" + } + }, + "node_modules/@node-rs/crc32-win32-ia32-msvc": { + "version": "1.10.6", + "resolved": "https://registry.npmjs.org/@node-rs/crc32-win32-ia32-msvc/-/crc32-win32-ia32-msvc-1.10.6.tgz", + "integrity": "sha512-DpDxQLaErJF9l36aghe1Mx+cOnYLKYo6qVPqPL9ukJ5rAGLtCdU0C+Zoi3gs9ySm8zmbFgazq/LvmsZYU42aBw==", + "cpu": [ + "ia32" + ], + "license": "MIT", + "optional": true, + "os": [ + "win32" + ], + "engines": { + "node": ">= 10" + } + }, + "node_modules/@node-rs/crc32-win32-x64-msvc": { + "version": "1.10.6", + "resolved": "https://registry.npmjs.org/@node-rs/crc32-win32-x64-msvc/-/crc32-win32-x64-msvc-1.10.6.tgz", + "integrity": "sha512-5B1vXosIIBw1m2Rcnw62IIfH7W9s9f7H7Ma0rRuhT8HR4Xh8QCgw6NJSI2S2MCngsGktYnAhyUvs81b7efTyQw==", + "cpu": [ + "x64" + ], + "license": "MIT", + "optional": true, + "os": [ + "win32" + ], + "engines": { + "node": ">= 10" + } + }, + "node_modules/@pinojs/redact": { + "version": "0.4.0", + "resolved": "https://registry.npmjs.org/@pinojs/redact/-/redact-0.4.0.tgz", + "integrity": "sha512-k2ENnmBugE/rzQfEcdWHcCY+/FM3VLzH9cYEsbdsoqrvzAKRhUZeRNhAZvB8OitQJ1TBed3yqWtdjzS6wJKBwg==", + "license": "MIT" + }, + "node_modules/@platformatic/dynamic-buffer": { + "version": "0.3.0", + "resolved": "https://registry.npmjs.org/@platformatic/dynamic-buffer/-/dynamic-buffer-0.3.0.tgz", + "integrity": "sha512-x/SN9lEK9qYqRXAQKXC61bD2LPCd3uUYfzAIX7+tpgaCLky7voJ/xcSh9cC+jmNbbzNH8cxDzYUJD4Lua5VmlQ==", + "license": "Apache-2.0", + "engines": { + "node": ">= 22.19.0" + } + }, + "node_modules/@platformatic/kafka": { + "version": "1.28.0", + "resolved": "https://registry.npmjs.org/@platformatic/kafka/-/kafka-1.28.0.tgz", + "integrity": "sha512-JwG9FG36gwXpu4j3+61sLbHGSXkxTDc0eGowGsbcimiL+AnnI4rUWSqkCurBmAuBRbH6fa/BX54YfYFMHAr1fg==", + "license": "Apache-2.0", + "dependencies": { + "@platformatic/dynamic-buffer": "^0.3.0", + "@platformatic/wasm-utils": "^0.1.0", + "ajv": "^8.17.1", + "avsc": "^5.7.9", + "debug": "^4.4.3", + "fastq": "^1.19.1", + "mnemonist": "^0.40.3", + "scule": "^1.3.0" + }, + "engines": { + "node": ">= 20.19.4 || >= 22.18.0 || >= 24.6.0" + }, + "optionalDependencies": { + "@node-rs/crc32": "^1.10.6", + "protobufjs": "^8.0.0" + } + }, + "node_modules/@platformatic/wasm-utils": { + "version": "0.1.0", + "resolved": "https://registry.npmjs.org/@platformatic/wasm-utils/-/wasm-utils-0.1.0.tgz", + "integrity": "sha512-1fMkKsdud0pNlBh0JRELpWi3NE3/Kb0SCKR+5g2vH3ANDNlWDPJVDjNIJZYNdXA73yBfTVVve7xxBV22PUqZeg==", + "license": "Apache-2.0", + "dependencies": { + "@platformatic/dynamic-buffer": "^0.3.0" + }, + "engines": { + "node": ">= 22.19.0" + } + }, + "node_modules/@protobufjs/aspromise": { + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/@protobufjs/aspromise/-/aspromise-1.1.2.tgz", + "integrity": "sha512-j+gKExEuLmKwvz3OgROXtrJ2UG2x8Ch2YZUxahh+s1F2HZ+wAceUNLkvy6zKCPVRkU++ZWQrdxsUeQXmcg4uoQ==", + "license": "BSD-3-Clause", + "optional": true + }, + "node_modules/@protobufjs/base64": { + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/@protobufjs/base64/-/base64-1.1.2.tgz", + "integrity": "sha512-AZkcAA5vnN/v4PDqKyMR5lx7hZttPDgClv83E//FMNhR2TMcLUhfRUBHCmSl0oi9zMgDDqRUJkSxO3wm85+XLg==", + "license": "BSD-3-Clause", + "optional": true + }, + "node_modules/@protobufjs/codegen": { + "version": "2.0.4", + "resolved": "https://registry.npmjs.org/@protobufjs/codegen/-/codegen-2.0.4.tgz", + "integrity": "sha512-YyFaikqM5sH0ziFZCN3xDC7zeGaB/d0IUb9CATugHWbd1FRFwWwt4ld4OYMPWu5a3Xe01mGAULCdqhMlPl29Jg==", + "license": "BSD-3-Clause", + "optional": true + }, + "node_modules/@protobufjs/eventemitter": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/@protobufjs/eventemitter/-/eventemitter-1.1.0.tgz", + "integrity": "sha512-j9ednRT81vYJ9OfVuXG6ERSTdEL1xVsNgqpkxMsbIabzSo3goCjDIveeGv5d03om39ML71RdmrGNjG5SReBP/Q==", + "license": "BSD-3-Clause", + "optional": true + }, + "node_modules/@protobufjs/fetch": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/@protobufjs/fetch/-/fetch-1.1.0.tgz", + "integrity": "sha512-lljVXpqXebpsijW71PZaCYeIcE5on1w5DlQy5WH6GLbFryLUrBD4932W/E2BSpfRJWseIL4v/KPgBFxDOIdKpQ==", + "license": "BSD-3-Clause", + "optional": true, + "dependencies": { + "@protobufjs/aspromise": "^1.1.1", + "@protobufjs/inquire": "^1.1.0" + } + }, + "node_modules/@protobufjs/float": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/@protobufjs/float/-/float-1.0.2.tgz", + "integrity": "sha512-Ddb+kVXlXst9d+R9PfTIxh1EdNkgoRe5tOX6t01f1lYWOvJnSPDBlG241QLzcyPdoNTsblLUdujGSE4RzrTZGQ==", + "license": "BSD-3-Clause", + "optional": true + }, + "node_modules/@protobufjs/inquire": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/@protobufjs/inquire/-/inquire-1.1.0.tgz", + "integrity": "sha512-kdSefcPdruJiFMVSbn801t4vFK7KB/5gd2fYvrxhuJYg8ILrmn9SKSX2tZdV6V+ksulWqS7aXjBcRXl3wHoD9Q==", + "license": "BSD-3-Clause", + "optional": true + }, + "node_modules/@protobufjs/path": { + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/@protobufjs/path/-/path-1.1.2.tgz", + "integrity": "sha512-6JOcJ5Tm08dOHAbdR3GrvP+yUUfkjG5ePsHYczMFLq3ZmMkAD98cDgcT2iA1lJ9NVwFd4tH/iSSoe44YWkltEA==", + "license": "BSD-3-Clause", + "optional": true + }, + "node_modules/@protobufjs/pool": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/@protobufjs/pool/-/pool-1.1.0.tgz", + "integrity": "sha512-0kELaGSIDBKvcgS4zkjz1PeddatrjYcmMWOlAuAPwAeccUrPHdUqo/J6LiymHHEiJT5NrF1UVwxY14f+fy4WQw==", + "license": "BSD-3-Clause", + "optional": true + }, + "node_modules/@protobufjs/utf8": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/@protobufjs/utf8/-/utf8-1.1.0.tgz", + "integrity": "sha512-Vvn3zZrhQZkkBE8LSuW3em98c0FwgO4nxzv6OdSxPKJIEKY2bGbHn+mhGIPerzI4twdxaP8/0+06HBpwf345Lw==", + "license": "BSD-3-Clause", + "optional": true + }, + "node_modules/@tybys/wasm-util": { + "version": "0.10.1", + "resolved": "https://registry.npmjs.org/@tybys/wasm-util/-/wasm-util-0.10.1.tgz", + "integrity": "sha512-9tTaPJLSiejZKx+Bmog4uSubteqTvFrVrURwkmHixBo0G4seD0zUxp98E1DzUBJxLQ3NPwXrGKDiVjwx/DpPsg==", + "license": "MIT", + "optional": true, + "dependencies": { + "tslib": "^2.4.0" + } + }, + "node_modules/@types/node": { + "version": "25.3.2", + "resolved": "https://registry.npmjs.org/@types/node/-/node-25.3.2.tgz", + "integrity": "sha512-RpV6r/ij22zRRdyBPcxDeKAzH43phWVKEjL2iksqo1Vz3CuBUrgmPpPhALKiRfU7OMCmeeO9vECBMsV0hMTG8Q==", + "devOptional": true, + "license": "MIT", + "dependencies": { + "undici-types": "~7.18.0" + } + }, + "node_modules/@types/pg": { + "version": "8.16.0", + "resolved": "https://registry.npmjs.org/@types/pg/-/pg-8.16.0.tgz", + "integrity": "sha512-RmhMd/wD+CF8Dfo+cVIy3RR5cl8CyfXQ0tGgW6XBL8L4LM/UTEbNXYRbLwU6w+CgrKBNbrQWt4FUtTfaU5jSYQ==", + "dev": true, + "license": "MIT", + "dependencies": { + "@types/node": "*", + "pg-protocol": "*", + "pg-types": "^2.2.0" + } + }, + "node_modules/ajv": { + "version": "8.18.0", + "resolved": "https://registry.npmjs.org/ajv/-/ajv-8.18.0.tgz", + "integrity": "sha512-PlXPeEWMXMZ7sPYOHqmDyCJzcfNrUr3fGNKtezX14ykXOEIvyK81d+qydx89KY5O71FKMPaQ2vBfBFI5NHR63A==", + "license": "MIT", + "dependencies": { + "fast-deep-equal": "^3.1.3", + "fast-uri": "^3.0.1", + "json-schema-traverse": "^1.0.0", + "require-from-string": "^2.0.2" + }, + "funding": { + "type": "github", + "url": "https://github.com/sponsors/epoberezkin" + } + }, + "node_modules/atomic-sleep": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/atomic-sleep/-/atomic-sleep-1.0.0.tgz", + "integrity": "sha512-kNOjDqAh7px0XWNI+4QbzoiR/nTkHAWNud2uvnJquD1/x5a7EQZMJT0AczqK0Qn67oY/TTQ1LbUKajZpp3I9tQ==", + "license": "MIT", + "engines": { + "node": ">=8.0.0" + } + }, + "node_modules/avsc": { + "version": "5.7.9", + "resolved": "https://registry.npmjs.org/avsc/-/avsc-5.7.9.tgz", + "integrity": "sha512-yOA4wFeI7ET3v32Di/sUybQ+ttP20JHSW3mxLuNGeO0uD6PPcvLrIQXSvy/rhJOWU5JrYh7U4OHplWMmtAtjMg==", + "license": "MIT", + "engines": { + "node": ">=0.11" + } + }, + "node_modules/colorette": { + "version": "2.0.20", + "resolved": "https://registry.npmjs.org/colorette/-/colorette-2.0.20.tgz", + "integrity": "sha512-IfEDxwoWIjkeXL1eXcDiow4UbKjhLdq6/EuSVR9GMN7KVH3r9gQ83e73hsz1Nd1T3ijd5xv1wcWRYO+D6kCI2w==", + "license": "MIT" + }, + "node_modules/dateformat": { + "version": "4.6.3", + "resolved": "https://registry.npmjs.org/dateformat/-/dateformat-4.6.3.tgz", + "integrity": "sha512-2P0p0pFGzHS5EMnhdxQi7aJN+iMheud0UhG4dlE1DLAlvL8JHjJJTX/CSm4JXwV0Ka5nGk3zC5mcb5bUQUxxMA==", + "license": "MIT", + "engines": { + "node": "*" + } + }, + "node_modules/debug": { + "version": "4.4.3", + "resolved": "https://registry.npmjs.org/debug/-/debug-4.4.3.tgz", + "integrity": "sha512-RGwwWnwQvkVfavKVt22FGLw+xYSdzARwm0ru6DhTVA3umU5hZc28V3kO4stgYryrTlLpuvgI9GiijltAjNbcqA==", + "license": "MIT", + "dependencies": { + "ms": "^2.1.3" + }, + "engines": { + "node": ">=6.0" + }, + "peerDependenciesMeta": { + "supports-color": { + "optional": true + } + } + }, + "node_modules/dot-prop": { + "version": "6.0.1", + "resolved": "https://registry.npmjs.org/dot-prop/-/dot-prop-6.0.1.tgz", + "integrity": "sha512-tE7ztYzXHIeyvc7N+hR3oi7FIbf/NIjVP9hmAt3yMXzrQ072/fpjGLx2GxNxGxUl5V73MEqYzioOMoVhGMJ5cA==", + "license": "MIT", + "dependencies": { + "is-obj": "^2.0.0" + }, + "engines": { + "node": ">=10" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, + "node_modules/end-of-stream": { + "version": "1.4.5", + "resolved": "https://registry.npmjs.org/end-of-stream/-/end-of-stream-1.4.5.tgz", + "integrity": "sha512-ooEGc6HP26xXq/N+GCGOT0JKCLDGrq2bQUZrQ7gyrJiZANJ/8YDTxTpQBXGMn+WbIQXNVpyWymm7KYVICQnyOg==", + "license": "MIT", + "dependencies": { + "once": "^1.4.0" + } + }, + "node_modules/fast-copy": { + "version": "4.0.2", + "resolved": "https://registry.npmjs.org/fast-copy/-/fast-copy-4.0.2.tgz", + "integrity": "sha512-ybA6PDXIXOXivLJK/z9e+Otk7ve13I4ckBvGO5I2RRmBU1gMHLVDJYEuJYhGwez7YNlYji2M2DvVU+a9mSFDlw==", + "license": "MIT" + }, + "node_modules/fast-deep-equal": { + "version": "3.1.3", + "resolved": "https://registry.npmjs.org/fast-deep-equal/-/fast-deep-equal-3.1.3.tgz", + "integrity": "sha512-f3qQ9oQy9j2AhBe/H9VC91wLmKBCCU/gDOnKNAYG5hswO7BLKj09Hc5HYNz9cGI++xlpDCIgDaitVs03ATR84Q==", + "license": "MIT" + }, + "node_modules/fast-safe-stringify": { + "version": "2.1.1", + "resolved": "https://registry.npmjs.org/fast-safe-stringify/-/fast-safe-stringify-2.1.1.tgz", + "integrity": "sha512-W+KJc2dmILlPplD/H4K9l9LcAHAfPtP6BY84uVLXQ6Evcz9Lcg33Y2z1IVblT6xdY54PXYVHEv+0Wpq8Io6zkA==", + "license": "MIT" + }, + "node_modules/fast-uri": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/fast-uri/-/fast-uri-3.1.0.tgz", + "integrity": "sha512-iPeeDKJSWf4IEOasVVrknXpaBV0IApz/gp7S2bb7Z4Lljbl2MGJRqInZiUrQwV16cpzw/D3S5j5Julj/gT52AA==", + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/fastify" + }, + { + "type": "opencollective", + "url": "https://opencollective.com/fastify" + } + ], + "license": "BSD-3-Clause" + }, + "node_modules/fastq": { + "version": "1.20.1", + "resolved": "https://registry.npmjs.org/fastq/-/fastq-1.20.1.tgz", + "integrity": "sha512-GGToxJ/w1x32s/D2EKND7kTil4n8OVk/9mycTc4VDza13lOvpUZTGX3mFSCtV9ksdGBVzvsyAVLM6mHFThxXxw==", + "license": "ISC", + "dependencies": { + "reusify": "^1.0.4" + } + }, + "node_modules/help-me": { + "version": "5.0.0", + "resolved": "https://registry.npmjs.org/help-me/-/help-me-5.0.0.tgz", + "integrity": "sha512-7xgomUX6ADmcYzFik0HzAxh/73YlKR9bmFzf51CZwR+b6YtzU2m0u49hQCqV6SvlqIqsaxovfwdvbnsw3b/zpg==", + "license": "MIT" + }, + "node_modules/is-obj": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/is-obj/-/is-obj-2.0.0.tgz", + "integrity": "sha512-drqDG3cbczxxEJRoOXcOjtdp1J/lyp1mNn0xaznRs8+muBhgQcrnbspox5X5fOw0HnMnbfDzvnEMEtqDEJEo8w==", + "license": "MIT", + "engines": { + "node": ">=8" + } + }, + "node_modules/joycon": { + "version": "3.1.1", + "resolved": "https://registry.npmjs.org/joycon/-/joycon-3.1.1.tgz", + "integrity": "sha512-34wB/Y7MW7bzjKRjUKTa46I2Z7eV62Rkhva+KkopW7Qvv/OSWBqvkSY7vusOPrNuZcUG3tApvdVgNB8POj3SPw==", + "license": "MIT", + "engines": { + "node": ">=10" + } + }, + "node_modules/json-schema-traverse": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/json-schema-traverse/-/json-schema-traverse-1.0.0.tgz", + "integrity": "sha512-NM8/P9n3XjXhIZn1lLhkFaACTOURQXjWhV4BA/RnOv8xvgqtqpAX9IO4mRQxSx1Rlo4tqzeqb0sOlruaOy3dug==", + "license": "MIT" + }, + "node_modules/long": { + "version": "5.3.2", + "resolved": "https://registry.npmjs.org/long/-/long-5.3.2.tgz", + "integrity": "sha512-mNAgZ1GmyNhD7AuqnTG3/VQ26o760+ZYBPKjPvugO8+nLbYfX6TVpJPseBvopbdY+qpZ/lKUnmEc1LeZYS3QAA==", + "license": "Apache-2.0", + "optional": true + }, + "node_modules/minimist": { + "version": "1.2.8", + "resolved": "https://registry.npmjs.org/minimist/-/minimist-1.2.8.tgz", + "integrity": "sha512-2yyAR8qBkN3YuheJanUpWC5U3bb5osDywNB8RzDVlDwDHbocAJveqqj1u8+SVD7jkWT4yvsHCpWqqWqAxb0zCA==", + "license": "MIT", + "funding": { + "url": "https://github.com/sponsors/ljharb" + } + }, + "node_modules/mnemonist": { + "version": "0.40.3", + "resolved": "https://registry.npmjs.org/mnemonist/-/mnemonist-0.40.3.tgz", + "integrity": "sha512-Vjyr90sJ23CKKH/qPAgUKicw/v6pRoamxIEDFOF8uSgFME7DqPRpHgRTejWVjkdGg5dXj0/NyxZHZ9bcjH+2uQ==", + "license": "MIT", + "dependencies": { + "obliterator": "^2.0.4" + } + }, + "node_modules/ms": { + "version": "2.1.3", + "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz", + "integrity": "sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==", + "license": "MIT" + }, + "node_modules/obliterator": { + "version": "2.0.5", + "resolved": "https://registry.npmjs.org/obliterator/-/obliterator-2.0.5.tgz", + "integrity": "sha512-42CPE9AhahZRsMNslczq0ctAEtqk8Eka26QofnqC346BZdHDySk3LWka23LI7ULIw11NmltpiLagIq8gBozxTw==", + "license": "MIT" + }, + "node_modules/on-exit-leak-free": { + "version": "2.1.2", + "resolved": "https://registry.npmjs.org/on-exit-leak-free/-/on-exit-leak-free-2.1.2.tgz", + "integrity": "sha512-0eJJY6hXLGf1udHwfNftBqH+g73EU4B504nZeKpz1sYRKafAghwxEJunB2O7rDZkL4PGfsMVnTXZ2EjibbqcsA==", + "license": "MIT", + "engines": { + "node": ">=14.0.0" + } + }, + "node_modules/once": { + "version": "1.4.0", + "resolved": "https://registry.npmjs.org/once/-/once-1.4.0.tgz", + "integrity": "sha512-lNaJgI+2Q5URQBkccEKHTQOPaXdUxnZZElQTZY0MFUAuaEqe1E+Nyvgdz/aIyNi6Z9MzO5dv1H8n58/GELp3+w==", + "license": "ISC", + "dependencies": { + "wrappy": "1" + } + }, + "node_modules/pg": { + "version": "8.19.0", + "resolved": "https://registry.npmjs.org/pg/-/pg-8.19.0.tgz", + "integrity": "sha512-QIcLGi508BAHkQ3pJNptsFz5WQMlpGbuBGBaIaXsWK8mel2kQ/rThYI+DbgjUvZrIr7MiuEuc9LcChJoEZK1xQ==", + "license": "MIT", + "peer": true, + "dependencies": { + "pg-connection-string": "^2.11.0", + "pg-pool": "^3.12.0", + "pg-protocol": "^1.12.0", + "pg-types": "2.2.0", + "pgpass": "1.0.5" + }, + "engines": { + "node": ">= 16.0.0" + }, + "optionalDependencies": { + "pg-cloudflare": "^1.3.0" + }, + "peerDependencies": { + "pg-native": ">=3.0.1" + }, + "peerDependenciesMeta": { + "pg-native": { + "optional": true + } + } + }, + "node_modules/pg-cloudflare": { + "version": "1.3.0", + "resolved": "https://registry.npmjs.org/pg-cloudflare/-/pg-cloudflare-1.3.0.tgz", + "integrity": "sha512-6lswVVSztmHiRtD6I8hw4qP/nDm1EJbKMRhf3HCYaqud7frGysPv7FYJ5noZQdhQtN2xJnimfMtvQq21pdbzyQ==", + "license": "MIT", + "optional": true + }, + "node_modules/pg-connection-string": { + "version": "2.11.0", + "resolved": "https://registry.npmjs.org/pg-connection-string/-/pg-connection-string-2.11.0.tgz", + "integrity": "sha512-kecgoJwhOpxYU21rZjULrmrBJ698U2RxXofKVzOn5UDj61BPj/qMb7diYUR1nLScCDbrztQFl1TaQZT0t1EtzQ==", + "license": "MIT" + }, + "node_modules/pg-int8": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/pg-int8/-/pg-int8-1.0.1.tgz", + "integrity": "sha512-WCtabS6t3c8SkpDBUlb1kjOs7l66xsGdKpIPZsg4wR+B3+u9UAum2odSsF9tnvxg80h4ZxLWMy4pRjOsFIqQpw==", + "license": "ISC", + "engines": { + "node": ">=4.0.0" + } + }, + "node_modules/pg-pool": { + "version": "3.12.0", + "resolved": "https://registry.npmjs.org/pg-pool/-/pg-pool-3.12.0.tgz", + "integrity": "sha512-eIJ0DES8BLaziFHW7VgJEBPi5hg3Nyng5iKpYtj3wbcAUV9A1wLgWiY7ajf/f/oO1wfxt83phXPY8Emztg7ITg==", + "license": "MIT", + "peerDependencies": { + "pg": ">=8.0" + } + }, + "node_modules/pg-protocol": { + "version": "1.12.0", + "resolved": "https://registry.npmjs.org/pg-protocol/-/pg-protocol-1.12.0.tgz", + "integrity": "sha512-uOANXNRACNdElMXJ0tPz6RBM0XQ61nONGAwlt8da5zs/iUOOCLBQOHSXnrC6fMsvtjxbOJrZZl5IScGv+7mpbg==", + "license": "MIT" + }, + "node_modules/pg-types": { + "version": "2.2.0", + "resolved": "https://registry.npmjs.org/pg-types/-/pg-types-2.2.0.tgz", + "integrity": "sha512-qTAAlrEsl8s4OiEQY69wDvcMIdQN6wdz5ojQiOy6YRMuynxenON0O5oCpJI6lshc6scgAY8qvJ2On/p+CXY0GA==", + "license": "MIT", + "dependencies": { + "pg-int8": "1.0.1", + "postgres-array": "~2.0.0", + "postgres-bytea": "~1.0.0", + "postgres-date": "~1.0.4", + "postgres-interval": "^1.1.0" + }, + "engines": { + "node": ">=4" + } + }, + "node_modules/pgpass": { + "version": "1.0.5", + "resolved": "https://registry.npmjs.org/pgpass/-/pgpass-1.0.5.tgz", + "integrity": "sha512-FdW9r/jQZhSeohs1Z3sI1yxFQNFvMcnmfuj4WBMUTxOrAyLMaTcE1aAMBiTlbMNaXvBCQuVi0R7hd8udDSP7ug==", + "license": "MIT", + "dependencies": { + "split2": "^4.1.0" + } + }, + "node_modules/pino": { + "version": "10.3.1", + "resolved": "https://registry.npmjs.org/pino/-/pino-10.3.1.tgz", + "integrity": "sha512-r34yH/GlQpKZbU1BvFFqOjhISRo1MNx1tWYsYvmj6KIRHSPMT2+yHOEb1SG6NMvRoHRF0a07kCOox/9yakl1vg==", + "license": "MIT", + "dependencies": { + "@pinojs/redact": "^0.4.0", + "atomic-sleep": "^1.0.0", + "on-exit-leak-free": "^2.1.0", + "pino-abstract-transport": "^3.0.0", + "pino-std-serializers": "^7.0.0", + "process-warning": "^5.0.0", + "quick-format-unescaped": "^4.0.3", + "real-require": "^0.2.0", + "safe-stable-stringify": "^2.3.1", + "sonic-boom": "^4.0.1", + "thread-stream": "^4.0.0" + }, + "bin": { + "pino": "bin.js" + } + }, + "node_modules/pino-abstract-transport": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/pino-abstract-transport/-/pino-abstract-transport-3.0.0.tgz", + "integrity": "sha512-wlfUczU+n7Hy/Ha5j9a/gZNy7We5+cXp8YL+X+PG8S0KXxw7n/JXA3c46Y0zQznIJ83URJiwy7Lh56WLokNuxg==", + "license": "MIT", + "dependencies": { + "split2": "^4.0.0" + } + }, + "node_modules/pino-pretty": { + "version": "13.1.3", + "resolved": "https://registry.npmjs.org/pino-pretty/-/pino-pretty-13.1.3.tgz", + "integrity": "sha512-ttXRkkOz6WWC95KeY9+xxWL6AtImwbyMHrL1mSwqwW9u+vLp/WIElvHvCSDg0xO/Dzrggz1zv3rN5ovTRVowKg==", + "license": "MIT", + "dependencies": { + "colorette": "^2.0.7", + "dateformat": "^4.6.3", + "fast-copy": "^4.0.0", + "fast-safe-stringify": "^2.1.1", + "help-me": "^5.0.0", + "joycon": "^3.1.1", + "minimist": "^1.2.6", + "on-exit-leak-free": "^2.1.0", + "pino-abstract-transport": "^3.0.0", + "pump": "^3.0.0", + "secure-json-parse": "^4.0.0", + "sonic-boom": "^4.0.1", + "strip-json-comments": "^5.0.2" + }, + "bin": { + "pino-pretty": "bin.js" + } + }, + "node_modules/pino-std-serializers": { + "version": "7.1.0", + "resolved": "https://registry.npmjs.org/pino-std-serializers/-/pino-std-serializers-7.1.0.tgz", + "integrity": "sha512-BndPH67/JxGExRgiX1dX0w1FvZck5Wa4aal9198SrRhZjH3GxKQUKIBnYJTdj2HDN3UQAS06HlfcSbQj2OHmaw==", + "license": "MIT" + }, + "node_modules/postgres-array": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/postgres-array/-/postgres-array-2.0.0.tgz", + "integrity": "sha512-VpZrUqU5A69eQyW2c5CA1jtLecCsN2U/bD6VilrFDWq5+5UIEVO7nazS3TEcHf1zuPYO/sqGvUvW62g86RXZuA==", + "license": "MIT", + "engines": { + "node": ">=4" + } + }, + "node_modules/postgres-bytea": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/postgres-bytea/-/postgres-bytea-1.0.1.tgz", + "integrity": "sha512-5+5HqXnsZPE65IJZSMkZtURARZelel2oXUEO8rH83VS/hxH5vv1uHquPg5wZs8yMAfdv971IU+kcPUczi7NVBQ==", + "license": "MIT", + "engines": { + "node": ">=0.10.0" + } + }, + "node_modules/postgres-date": { + "version": "1.0.7", + "resolved": "https://registry.npmjs.org/postgres-date/-/postgres-date-1.0.7.tgz", + "integrity": "sha512-suDmjLVQg78nMK2UZ454hAG+OAW+HQPZ6n++TNDUX+L0+uUlLywnoxJKDou51Zm+zTCjrCl0Nq6J9C5hP9vK/Q==", + "license": "MIT", + "engines": { + "node": ">=0.10.0" + } + }, + "node_modules/postgres-interval": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/postgres-interval/-/postgres-interval-1.2.0.tgz", + "integrity": "sha512-9ZhXKM/rw350N1ovuWHbGxnGh/SNJ4cnxHiM0rxE4VN41wsg8P8zWn9hv/buK00RP4WvlOyr/RBDiptyxVbkZQ==", + "license": "MIT", + "dependencies": { + "xtend": "^4.0.0" + }, + "engines": { + "node": ">=0.10.0" + } + }, + "node_modules/process-warning": { + "version": "5.0.0", + "resolved": "https://registry.npmjs.org/process-warning/-/process-warning-5.0.0.tgz", + "integrity": "sha512-a39t9ApHNx2L4+HBnQKqxxHNs1r7KF+Intd8Q/g1bUh6q0WIp9voPXJ/x0j+ZL45KF1pJd9+q2jLIRMfvEshkA==", + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/fastify" + }, + { + "type": "opencollective", + "url": "https://opencollective.com/fastify" + } + ], + "license": "MIT" + }, + "node_modules/protobufjs": { + "version": "8.0.0", + "resolved": "https://registry.npmjs.org/protobufjs/-/protobufjs-8.0.0.tgz", + "integrity": "sha512-jx6+sE9h/UryaCZhsJWbJtTEy47yXoGNYI4z8ZaRncM0zBKeRqjO2JEcOUYwrYGb1WLhXM1FfMzW3annvFv0rw==", + "hasInstallScript": true, + "license": "BSD-3-Clause", + "optional": true, + "dependencies": { + "@protobufjs/aspromise": "^1.1.2", + "@protobufjs/base64": "^1.1.2", + "@protobufjs/codegen": "^2.0.4", + "@protobufjs/eventemitter": "^1.1.0", + "@protobufjs/fetch": "^1.1.0", + "@protobufjs/float": "^1.0.2", + "@protobufjs/inquire": "^1.1.0", + "@protobufjs/path": "^1.1.2", + "@protobufjs/pool": "^1.1.0", + "@protobufjs/utf8": "^1.1.0", + "@types/node": ">=13.7.0", + "long": "^5.0.0" + }, + "engines": { + "node": ">=12.0.0" + } + }, + "node_modules/pump": { + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/pump/-/pump-3.0.3.tgz", + "integrity": "sha512-todwxLMY7/heScKmntwQG8CXVkWUOdYxIvY2s0VWAAMh/nd8SoYiRaKjlr7+iCs984f2P8zvrfWcDDYVb73NfA==", + "license": "MIT", + "dependencies": { + "end-of-stream": "^1.1.0", + "once": "^1.3.1" + } + }, + "node_modules/quick-format-unescaped": { + "version": "4.0.4", + "resolved": "https://registry.npmjs.org/quick-format-unescaped/-/quick-format-unescaped-4.0.4.tgz", + "integrity": "sha512-tYC1Q1hgyRuHgloV/YXs2w15unPVh8qfu/qCTfhTYamaw7fyhumKa2yGpdSo87vY32rIclj+4fWYQXUMs9EHvg==", + "license": "MIT" + }, + "node_modules/real-require": { + "version": "0.2.0", + "resolved": "https://registry.npmjs.org/real-require/-/real-require-0.2.0.tgz", + "integrity": "sha512-57frrGM/OCTLqLOAh0mhVA9VBMHd+9U7Zb2THMGdBUoZVOtGbJzjxsYGDJ3A9AYYCP4hn6y1TVbaOfzWtm5GFg==", + "license": "MIT", + "engines": { + "node": ">= 12.13.0" + } + }, + "node_modules/require-from-string": { + "version": "2.0.2", + "resolved": "https://registry.npmjs.org/require-from-string/-/require-from-string-2.0.2.tgz", + "integrity": "sha512-Xf0nWe6RseziFMu+Ap9biiUbmplq6S9/p+7w7YXP/JBHhrUDDUhwa+vANyubuqfZWTveU//DYVGsDG7RKL/vEw==", + "license": "MIT", + "engines": { + "node": ">=0.10.0" + } + }, + "node_modules/reusify": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/reusify/-/reusify-1.1.0.tgz", + "integrity": "sha512-g6QUff04oZpHs0eG5p83rFLhHeV00ug/Yf9nZM6fLeUrPguBTkTQOdpAWWspMh55TZfVQDPaN3NQJfbVRAxdIw==", + "license": "MIT", + "engines": { + "iojs": ">=1.0.0", + "node": ">=0.10.0" + } + }, + "node_modules/safe-stable-stringify": { + "version": "2.5.0", + "resolved": "https://registry.npmjs.org/safe-stable-stringify/-/safe-stable-stringify-2.5.0.tgz", + "integrity": "sha512-b3rppTKm9T+PsVCBEOUR46GWI7fdOs00VKZ1+9c1EWDaDMvjQc6tUwuFyIprgGgTcWoVHSKrU8H31ZHA2e0RHA==", + "license": "MIT", + "engines": { + "node": ">=10" + } + }, + "node_modules/scule": { + "version": "1.3.0", + "resolved": "https://registry.npmjs.org/scule/-/scule-1.3.0.tgz", + "integrity": "sha512-6FtHJEvt+pVMIB9IBY+IcCJ6Z5f1iQnytgyfKMhDKgmzYG+TeH/wx1y3l27rshSbLiSanrR9ffZDrEsmjlQF2g==", + "license": "MIT" + }, + "node_modules/secure-json-parse": { + "version": "4.1.0", + "resolved": "https://registry.npmjs.org/secure-json-parse/-/secure-json-parse-4.1.0.tgz", + "integrity": "sha512-l4KnYfEyqYJxDwlNVyRfO2E4NTHfMKAWdUuA8J0yve2Dz/E/PdBepY03RvyJpssIpRFwJoCD55wA+mEDs6ByWA==", + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/fastify" + }, + { + "type": "opencollective", + "url": "https://opencollective.com/fastify" + } + ], + "license": "BSD-3-Clause" + }, + "node_modules/sonic-boom": { + "version": "4.2.1", + "resolved": "https://registry.npmjs.org/sonic-boom/-/sonic-boom-4.2.1.tgz", + "integrity": "sha512-w6AxtubXa2wTXAUsZMMWERrsIRAdrK0Sc+FUytWvYAhBJLyuI4llrMIC1DtlNSdI99EI86KZum2MMq3EAZlF9Q==", + "license": "MIT", + "dependencies": { + "atomic-sleep": "^1.0.0" + } + }, + "node_modules/split2": { + "version": "4.2.0", + "resolved": "https://registry.npmjs.org/split2/-/split2-4.2.0.tgz", + "integrity": "sha512-UcjcJOWknrNkF6PLX83qcHM6KHgVKNkV62Y8a5uYDVv9ydGQVwAHMKqHdJje1VTWpljG0WYpCDhrCdAOYH4TWg==", + "license": "ISC", + "engines": { + "node": ">= 10.x" + } + }, + "node_modules/strip-json-comments": { + "version": "5.0.3", + "resolved": "https://registry.npmjs.org/strip-json-comments/-/strip-json-comments-5.0.3.tgz", + "integrity": "sha512-1tB5mhVo7U+ETBKNf92xT4hrQa3pm0MZ0PQvuDnWgAAGHDsfp4lPSpiS6psrSiet87wyGPh9ft6wmhOMQ0hDiw==", + "license": "MIT", + "engines": { + "node": ">=14.16" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, + "node_modules/thread-stream": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/thread-stream/-/thread-stream-4.0.0.tgz", + "integrity": "sha512-4iMVL6HAINXWf1ZKZjIPcz5wYaOdPhtO8ATvZ+Xqp3BTdaqtAwQkNmKORqcIo5YkQqGXq5cwfswDwMqqQNrpJA==", + "license": "MIT", + "dependencies": { + "real-require": "^0.2.0" + }, + "engines": { + "node": ">=20" + } + }, + "node_modules/tslib": { + "version": "2.8.1", + "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.8.1.tgz", + "integrity": "sha512-oJFu94HQb+KVduSUQL7wnpmqnfmLsOA/nAh6b6EH0wCEoK0/mPeXU6c3wKDV83MkOuHPRHtSXKKU99IBazS/2w==", + "license": "0BSD" + }, + "node_modules/typescript": { + "version": "5.9.3", + "resolved": "https://registry.npmjs.org/typescript/-/typescript-5.9.3.tgz", + "integrity": "sha512-jl1vZzPDinLr9eUt3J/t7V6FgNEw9QjvBPdysz9KfQDD41fQrC2Y4vKQdiaUpFT4bXlb1RHhLpp8wtm6M5TgSw==", + "dev": true, + "license": "Apache-2.0", + "bin": { + "tsc": "bin/tsc", + "tsserver": "bin/tsserver" + }, + "engines": { + "node": ">=14.17" + } + }, + "node_modules/undici-types": { + "version": "7.18.2", + "resolved": "https://registry.npmjs.org/undici-types/-/undici-types-7.18.2.tgz", + "integrity": "sha512-AsuCzffGHJybSaRrmr5eHr81mwJU3kjw6M+uprWvCXiNeN9SOGwQ3Jn8jb8m3Z6izVgknn1R0FTCEAP2QrLY/w==", + "devOptional": true, + "license": "MIT" + }, + "node_modules/wrappy": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/wrappy/-/wrappy-1.0.2.tgz", + "integrity": "sha512-l4Sp/DRseor9wL6EvV2+TuQn63dMkPjZ/sp9XkghTEbV9KlPS1xUsZ3u7/IQO4wxtcFB4bgpQPRcR3QCvezPcQ==", + "license": "ISC" + }, + "node_modules/xtend": { + "version": "4.0.2", + "resolved": "https://registry.npmjs.org/xtend/-/xtend-4.0.2.tgz", + "integrity": "sha512-LKYU1iAXJXUgAXn9URjiu+MWhyUXHsvfp7mcuYm9dSUKK0/CjtrUwFAxD82/mCWbtLsGjFIad0wIsod4zrTAEQ==", + "license": "MIT", + "engines": { + "node": ">=0.4" + } + }, + "node_modules/zod": { + "version": "4.3.6", + "resolved": "https://registry.npmjs.org/zod/-/zod-4.3.6.tgz", + "integrity": "sha512-rftlrkhHZOcjDwkGlnUtZZkvaPHCsDATp4pGpuOOMDaTdDDXF91wuVDJoWoPsKX/3YPQ5fHuF3STjcYyKr+Qhg==", + "license": "MIT", + "peer": true, + "funding": { + "url": "https://github.com/sponsors/colinhacks" + } + } + } +} diff --git a/packages/kafka/load-tests/package.json b/packages/kafka/load-tests/package.json new file mode 100644 index 00000000..8efcfdfe --- /dev/null +++ b/packages/kafka/load-tests/package.json @@ -0,0 +1,43 @@ +{ + "name": "@message-queue-toolkit/kafka-load-tests", + "version": "0.0.0", + "private": true, + "type": "module", + "scripts": { + "lint:types": "tsc -noEmit", + "docker:start": "docker compose up -d --wait", + "docker:stop": "docker compose down -v", + "load:cdc": "node src/run.ts", + "load:cdc:light": "node src/run.ts --rate 100 --duration 30", + "load:cdc:medium": "node src/run.ts --rate 1000 --duration 60", + "load:cdc:heavy": "node src/run.ts --rate 5000 --duration 120", + "load:cdc:batch": "node src/run-batch.ts", + "load:cdc:batch:light": "node src/run-batch.ts --rate 100 --duration 30", + "load:cdc:batch:medium": "node src/run-batch.ts --rate 1000 --duration 60", + "load:cdc:batch:heavy": "node src/run-batch.ts --rate 5000 --duration 120", + "load:direct": "node src/run-direct.ts", + "load:direct:light": "node src/run-direct.ts --rate 100 --duration 30", + "load:direct:medium": "node src/run-direct.ts --rate 1000 --duration 60", + "load:direct:heavy": "node src/run-direct.ts --rate 5000 --duration 120", + "load:direct:batch": "node src/run-direct-batch.ts", + "load:direct:batch:light": "node src/run-direct-batch.ts --rate 100 --duration 30", + "load:direct:batch:medium": "node src/run-direct-batch.ts --rate 1000 --duration 60", + "load:direct:batch:heavy": "node src/run-direct-batch.ts --rate 5000 --duration 120" + }, + "engines": { + "node": ">=22.18.0" + }, + "dependencies": { + "@message-queue-toolkit/kafka": "file:../../kafka", + "@message-queue-toolkit/core": "file:../../core", + "@message-queue-toolkit/schemas": "file:../../schemas", + "@lokalise/node-core": "^14.2.0", + "@platformatic/kafka": "1.28.0", + "pg": "^8.19.0", + "zod": "^4.0.17" + }, + "devDependencies": { + "@types/pg": "^8.16.0", + "typescript": "^5.9.3" + } +} diff --git a/packages/kafka/load-tests/scripts/init-crdb.sh b/packages/kafka/load-tests/scripts/init-crdb.sh new file mode 100644 index 00000000..77eea2f9 --- /dev/null +++ b/packages/kafka/load-tests/scripts/init-crdb.sh @@ -0,0 +1,59 @@ +#!/bin/bash +set -e + +CRDB_HOST="cockroachdb" +CRDB_PORT="26257" + +echo "Waiting for CockroachDB to be ready..." +until /cockroach/cockroach sql --insecure --host="$CRDB_HOST" --port="$CRDB_PORT" -e "SELECT 1" > /dev/null 2>&1; do + sleep 1 +done +echo "CockroachDB is ready." + +/cockroach/cockroach sql --insecure --host="$CRDB_HOST" --port="$CRDB_PORT" <<'SQL' + +CREATE DATABASE IF NOT EXISTS loadtest; + +USE loadtest; + +CREATE TABLE IF NOT EXISTS events ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + event_type STRING NOT NULL, + payload JSONB NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE TABLE IF NOT EXISTS orders ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + customer_id STRING NOT NULL, + amount DECIMAL NOT NULL, + status STRING NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +SET CLUSTER SETTING kv.rangefeed.enabled = true; + +SQL + +# Check for existing running changefeeds before creating a new one +EXISTING=$(/cockroach/cockroach sql --insecure --host="$CRDB_HOST" --port="$CRDB_PORT" \ + --format=csv -e "SELECT count(*) FROM [SHOW CHANGEFEED JOBS] WHERE status = 'running'" 2>/dev/null | tail -1) + +if [ "${EXISTING:-0}" -gt 0 ]; then + echo "Changefeed already exists, skipping creation." +else + /cockroach/cockroach sql --insecure --host="$CRDB_HOST" --port="$CRDB_PORT" <<'SQL' +USE loadtest; + +CREATE CHANGEFEED FOR events, orders + INTO 'kafka://kafka:9093' + WITH format = json, + updated, + resolved = '5s', + diff, + kafka_sink_config = '{"Flush":{"MaxMessages":100,"Frequency":"500ms"}}'; +SQL + echo "Changefeed created successfully." +fi + +echo "Database and tables initialized successfully." diff --git a/packages/kafka/load-tests/src/batch-load-generator.ts b/packages/kafka/load-tests/src/batch-load-generator.ts new file mode 100644 index 00000000..3fc1f602 --- /dev/null +++ b/packages/kafka/load-tests/src/batch-load-generator.ts @@ -0,0 +1,90 @@ +import { setTimeout } from 'node:timers/promises' +import { CdcBatchConsumer } from './cdc-batch-consumer.ts' +import { config } from './config.ts' +import { CrdbClient } from './crdb-client.ts' +import { MetricsCollector } from './metrics-collector.ts' + +export interface BatchLoadTestOptions { + rate: number + duration: number + batchSize: number + consumerBatchSize: number + consumerBatchTimeoutMs: number +} + +export async function runBatchLoadTest(options: BatchLoadTestOptions): Promise { + const { rate, duration, batchSize, consumerBatchSize, consumerBatchTimeoutMs } = options + + console.log( + `Starting CDC batch load test: ${rate} rows/sec, ${duration}s duration, insert batch=${batchSize}, consumer batch=${consumerBatchSize}, timeout=${consumerBatchTimeoutMs}ms`, + ) + + const metrics = new MetricsCollector() + const crdb = new CrdbClient() + const consumer = new CdcBatchConsumer(metrics, consumerBatchSize, consumerBatchTimeoutMs) + + console.log('Initializing Kafka batch consumer...') + await consumer.init() + console.log('Batch consumer ready.') + + const reportInterval = setInterval(() => metrics.report(), config.reportIntervalMs) + + const totalRows = rate * duration + + console.log(`Generating ${totalRows.toLocaleString()} total rows`) + + const loadStartTime = Date.now() + let totalInserted = 0 + let inflight = 0 + const maxInflight = 10 + + while (totalInserted < totalRows) { + while (inflight >= maxInflight) { + await setTimeout(5) + } + + const remaining = totalRows - totalInserted + const currentBatch = Math.min(batchSize, remaining) + const eventCount = Math.ceil(currentBatch / 2) + const orderCount = currentBatch - eventCount + + totalInserted += currentBatch + inflight++ + + Promise.all([crdb.insertEvents(eventCount), crdb.insertOrders(orderCount)]) + .then(() => metrics.recordProduced(currentBatch)) + .catch((err) => console.error('Insert error:', err)) + .finally(() => { + inflight-- + }) + + if (rate > 0) { + const elapsed = Date.now() - loadStartTime + const expectedElapsed = (totalInserted / rate) * 1000 + const sleepMs = expectedElapsed - elapsed + if (sleepMs > 0) { + await setTimeout(sleepMs) + } + } + } + + while (inflight > 0) { + await setTimeout(10) + } + + console.log(`\nLoad generation complete. ${totalInserted.toLocaleString()} rows inserted.`) + console.log(`Waiting up to ${config.drainTimeoutMs / 1000}s for consumer to drain...`) + + const drainStart = Date.now() + while (metrics.backlog > 0 && Date.now() - drainStart < config.drainTimeoutMs) { + await setTimeout(500) + } + + clearInterval(reportInterval) + metrics.printFinalReport() + + console.log('Shutting down...') + await consumer.close() + await crdb.close() + console.log('Done.') +} diff --git a/packages/kafka/load-tests/src/cdc-batch-consumer.ts b/packages/kafka/load-tests/src/cdc-batch-consumer.ts new file mode 100644 index 00000000..e5ebdf6e --- /dev/null +++ b/packages/kafka/load-tests/src/cdc-batch-consumer.ts @@ -0,0 +1,94 @@ +import { randomUUID } from 'node:crypto' +import { globalLogger } from '@lokalise/node-core' +import type { KafkaConsumerDependencies } from '@message-queue-toolkit/kafka' +import { + AbstractKafkaConsumer, + KafkaHandlerConfig, + KafkaHandlerRoutingBuilder, +} from '@message-queue-toolkit/kafka' +import { + CDC_EVENT_SCHEMA, + CDC_ORDER_SCHEMA, + type CDC_TOPICS_CONFIG, + type CdcEvent, + type CdcOrder, +} from './cdc-schemas.ts' +import { config } from './config.ts' +import type { MetricsCollector } from './metrics-collector.ts' + +type ExecutionContext = { + metrics: MetricsCollector +} + +export class CdcBatchConsumer extends AbstractKafkaConsumer< + typeof CDC_TOPICS_CONFIG, + ExecutionContext, + true +> { + constructor(metrics: MetricsCollector, batchSize = 50, timeoutMs = 200) { + const deps: KafkaConsumerDependencies = { + logger: globalLogger, + errorReporter: { report: () => {} }, + transactionObservabilityManager: { + start: () => {}, + stop: () => {}, + startWithGroup: () => {}, + addCustomAttributes: () => {}, + }, + } + + super( + deps, + { + kafka: { + bootstrapBrokers: config.kafka.bootstrapBrokers, + clientId: `cdc-batch-consumer-${randomUUID()}`, + }, + groupId: `cdc-batch-load-test-${randomUUID()}`, + batchProcessingEnabled: true, + batchProcessingOptions: { + batchSize, + timeoutMilliseconds: timeoutMs, + }, + handlers: new KafkaHandlerRoutingBuilder() + .addConfig( + 'events', + new KafkaHandlerConfig(CDC_EVENT_SCHEMA, (messages, ctx) => { + console.log( + `[${new Date().toISOString()}] [batch-handler] events batch received: ${messages.length} messages`, + ) + for (const message of messages) { + const value = message.value as CdcEvent + if (!value.after) continue // skip resolved heartbeats + let loadtestTs: number | undefined + const payload = value.after.payload as Record + if (typeof payload.loadtest_ts === 'number') { + loadtestTs = payload.loadtest_ts + } + ctx.metrics.recordConsumed('events', loadtestTs) + } + }), + ) + .addConfig( + 'orders', + new KafkaHandlerConfig(CDC_ORDER_SCHEMA, (messages, ctx) => { + console.log( + `[${new Date().toISOString()}] [batch-handler] orders batch received: ${messages.length} messages`, + ) + for (const message of messages) { + const value = message.value as CdcOrder + if (!value.after) continue // skip resolved heartbeats + ctx.metrics.recordConsumed('orders') + } + }), + ) + .build(), + autocreateTopics: true, + logMessages: true, + handlerSpy: false, + maxWaitTime: 5, + }, + { metrics }, + ) + } +} diff --git a/packages/kafka/load-tests/src/cdc-consumer.ts b/packages/kafka/load-tests/src/cdc-consumer.ts new file mode 100644 index 00000000..5fec54ad --- /dev/null +++ b/packages/kafka/load-tests/src/cdc-consumer.ts @@ -0,0 +1,80 @@ +import { randomUUID } from 'node:crypto' +import { globalLogger } from '@lokalise/node-core' +import type { KafkaConsumerDependencies } from '@message-queue-toolkit/kafka' +import { + AbstractKafkaConsumer, + KafkaHandlerConfig, + KafkaHandlerRoutingBuilder, +} from '@message-queue-toolkit/kafka' +import { + CDC_EVENT_SCHEMA, + CDC_ORDER_SCHEMA, + type CDC_TOPICS_CONFIG, + type CdcEvent, + type CdcOrder, +} from './cdc-schemas.ts' +import { config } from './config.ts' +import type { MetricsCollector } from './metrics-collector.ts' + +type ExecutionContext = { + metrics: MetricsCollector +} + +export class CdcConsumer extends AbstractKafkaConsumer { + constructor(metrics: MetricsCollector) { + const deps: KafkaConsumerDependencies = { + logger: globalLogger, + errorReporter: { report: () => {} }, + transactionObservabilityManager: { + start: () => {}, + stop: () => {}, + startWithGroup: () => {}, + addCustomAttributes: () => {}, + }, + } + + super( + deps, + { + kafka: { + bootstrapBrokers: config.kafka.bootstrapBrokers, + clientId: `cdc-consumer-${randomUUID()}`, + }, + groupId: `cdc-load-test-${randomUUID()}`, + batchProcessingEnabled: false, + handlers: new KafkaHandlerRoutingBuilder< + typeof CDC_TOPICS_CONFIG, + ExecutionContext, + false + >() + .addConfig( + 'events', + new KafkaHandlerConfig(CDC_EVENT_SCHEMA, (message, ctx) => { + const value = message.value as CdcEvent + if (!value.after) return // skip resolved heartbeats + let loadtestTs: number | undefined + const payload = value.after.payload as Record + if (typeof payload.loadtest_ts === 'number') { + loadtestTs = payload.loadtest_ts + } + ctx.metrics.recordConsumed('events', loadtestTs) + }), + ) + .addConfig( + 'orders', + new KafkaHandlerConfig(CDC_ORDER_SCHEMA, (message, ctx) => { + const value = message.value as CdcOrder + if (!value.after) return // skip resolved heartbeats + ctx.metrics.recordConsumed('orders') + }), + ) + .build(), + autocreateTopics: true, + logMessages: false, + handlerSpy: false, + maxWaitTime: 5, + }, + { metrics }, + ) + } +} diff --git a/packages/kafka/load-tests/src/cdc-schemas.ts b/packages/kafka/load-tests/src/cdc-schemas.ts new file mode 100644 index 00000000..fd26a576 --- /dev/null +++ b/packages/kafka/load-tests/src/cdc-schemas.ts @@ -0,0 +1,38 @@ +import type { TopicConfig } from '@message-queue-toolkit/kafka' +import z from 'zod/v4' + +const eventRowSchema = z.object({ + id: z.string(), + event_type: z.string(), + payload: z.record(z.string(), z.unknown()), + created_at: z.string(), +}) + +const orderRowSchema = z.object({ + id: z.string(), + customer_id: z.string(), + amount: z.coerce.string(), + status: z.string(), + created_at: z.string(), +}) + +export const CDC_EVENT_SCHEMA = z.object({ + after: eventRowSchema.nullable().optional(), + before: eventRowSchema.nullable().optional(), + updated: z.string().optional(), + resolved: z.string().optional(), +}) +export type CdcEvent = z.output + +export const CDC_ORDER_SCHEMA = z.object({ + after: orderRowSchema.nullable().optional(), + before: orderRowSchema.nullable().optional(), + updated: z.string().optional(), + resolved: z.string().optional(), +}) +export type CdcOrder = z.output + +export const CDC_TOPICS_CONFIG = [ + { topic: 'events', schema: CDC_EVENT_SCHEMA }, + { topic: 'orders', schema: CDC_ORDER_SCHEMA }, +] as const satisfies TopicConfig[] diff --git a/packages/kafka/load-tests/src/config.ts b/packages/kafka/load-tests/src/config.ts new file mode 100644 index 00000000..71aa941b --- /dev/null +++ b/packages/kafka/load-tests/src/config.ts @@ -0,0 +1,13 @@ +export const config = { + kafka: { + bootstrapBrokers: (process.env.KAFKA_BROKERS ?? 'localhost:9092').split(','), + clientId: 'cdc-load-test', + }, + crdb: { + connectionString: + process.env.CRDB_URL ?? 'postgresql://root@localhost:26257/loadtest?sslmode=disable', + poolSize: 10, + }, + reportIntervalMs: 5_000, + drainTimeoutMs: 30_000, +} as const diff --git a/packages/kafka/load-tests/src/crdb-client.ts b/packages/kafka/load-tests/src/crdb-client.ts new file mode 100644 index 00000000..2022a713 --- /dev/null +++ b/packages/kafka/load-tests/src/crdb-client.ts @@ -0,0 +1,62 @@ +import pg from 'pg' +import { config } from './config.ts' + +export class CrdbClient { + private readonly pool: pg.Pool + + constructor() { + this.pool = new pg.Pool({ + connectionString: config.crdb.connectionString, + max: config.crdb.poolSize, + }) + } + + async insertEvents(count: number): Promise { + if (count === 0) return + + const values: unknown[] = [] + const placeholders: string[] = [] + + for (let i = 0; i < count; i++) { + const offset = i * 2 + placeholders.push(`(gen_random_uuid(), $${offset + 1}::STRING, $${offset + 2}::JSONB, now())`) + values.push( + `load_test_${i % 5}`, + JSON.stringify({ loadtest_ts: Date.now(), index: i, data: `event-payload-${i}` }), + ) + } + + await this.pool.query( + `INSERT INTO events (id, event_type, payload, created_at) VALUES ${placeholders.join(', ')}`, + values, + ) + } + + async insertOrders(count: number): Promise { + if (count === 0) return + + const values: unknown[] = [] + const placeholders: string[] = [] + + for (let i = 0; i < count; i++) { + const offset = i * 3 + placeholders.push( + `(gen_random_uuid(), $${offset + 1}::STRING, $${offset + 2}::DECIMAL, $${offset + 3}::STRING, now())`, + ) + values.push( + `customer-${(i % 100).toString().padStart(3, '0')}`, + (Math.random() * 1000).toFixed(2), + ['pending', 'confirmed', 'shipped', 'delivered'][i % 4], + ) + } + + await this.pool.query( + `INSERT INTO orders (id, customer_id, amount, status, created_at) VALUES ${placeholders.join(', ')}`, + values, + ) + } + + async close(): Promise { + await this.pool.end() + } +} diff --git a/packages/kafka/load-tests/src/direct-batch-consumer.ts b/packages/kafka/load-tests/src/direct-batch-consumer.ts new file mode 100644 index 00000000..1035180d --- /dev/null +++ b/packages/kafka/load-tests/src/direct-batch-consumer.ts @@ -0,0 +1,93 @@ +import { randomUUID } from 'node:crypto' +import { globalLogger } from '@lokalise/node-core' +import type { KafkaConsumerDependencies } from '@message-queue-toolkit/kafka' +import { + AbstractKafkaConsumer, + KafkaHandlerConfig, + KafkaHandlerRoutingBuilder, +} from '@message-queue-toolkit/kafka' +import { config } from './config.ts' +import { + DIRECT_EVENT_SCHEMA, + DIRECT_ORDER_SCHEMA, + type DIRECT_TOPICS_CONFIG, + type DirectEvent, +} from './direct-schemas.ts' +import type { MetricsCollector } from './metrics-collector.ts' + +type ExecutionContext = { + metrics: MetricsCollector +} + +export class DirectBatchConsumer extends AbstractKafkaConsumer< + typeof DIRECT_TOPICS_CONFIG, + ExecutionContext, + true +> { + constructor(metrics: MetricsCollector, batchSize = 50, timeoutMs = 200) { + const deps: KafkaConsumerDependencies = { + logger: globalLogger, + errorReporter: { report: () => {} }, + transactionObservabilityManager: { + start: () => {}, + stop: () => {}, + startWithGroup: () => {}, + addCustomAttributes: () => {}, + }, + } + + super( + deps, + { + kafka: { + bootstrapBrokers: config.kafka.bootstrapBrokers, + clientId: `direct-batch-consumer-${randomUUID()}`, + }, + groupId: `direct-batch-load-test-${randomUUID()}`, + batchProcessingEnabled: true, + batchProcessingOptions: { + batchSize, + timeoutMilliseconds: timeoutMs, + }, + handlers: new KafkaHandlerRoutingBuilder< + typeof DIRECT_TOPICS_CONFIG, + ExecutionContext, + true + >() + .addConfig( + 'direct-events', + new KafkaHandlerConfig(DIRECT_EVENT_SCHEMA, (messages, ctx) => { + console.log( + `[${new Date().toISOString()}] [batch-handler] direct-events batch received: ${messages.length} messages`, + ) + for (const message of messages) { + const value = message.value as DirectEvent + const loadtestTs = + typeof value.payload.loadtest_ts === 'number' + ? value.payload.loadtest_ts + : undefined + ctx.metrics.recordConsumed('direct-events', loadtestTs) + } + }), + ) + .addConfig( + 'direct-orders', + new KafkaHandlerConfig(DIRECT_ORDER_SCHEMA, (messages, ctx) => { + console.log( + `[${new Date().toISOString()}] [batch-handler] direct-orders batch received: ${messages.length} messages`, + ) + for (const message of messages) { + ctx.metrics.recordConsumed('direct-orders') + } + }), + ) + .build(), + autocreateTopics: true, + logMessages: true, + handlerSpy: false, + maxWaitTime: 5, + }, + { metrics }, + ) + } +} diff --git a/packages/kafka/load-tests/src/direct-batch-load-generator.ts b/packages/kafka/load-tests/src/direct-batch-load-generator.ts new file mode 100644 index 00000000..12a33c5e --- /dev/null +++ b/packages/kafka/load-tests/src/direct-batch-load-generator.ts @@ -0,0 +1,107 @@ +import { randomUUID } from 'node:crypto' +import { setTimeout } from 'node:timers/promises' +import { config } from './config.ts' +import { DirectBatchConsumer } from './direct-batch-consumer.ts' +import { DirectPublisher } from './direct-publisher.ts' +import type { DirectEvent, DirectOrder } from './direct-schemas.ts' +import { MetricsCollector } from './metrics-collector.ts' + +export interface BatchLoadTestOptions { + rate: number + duration: number + batchSize: number + consumerBatchSize: number + consumerBatchTimeoutMs: number +} + +function generateEvent(index: number): DirectEvent { + return { + id: randomUUID(), + event_type: `load_test_${index % 5}`, + payload: { loadtest_ts: Date.now(), index, data: `event-payload-${index}` }, + created_at: new Date().toISOString(), + } +} + +function generateOrder(index: number): DirectOrder { + return { + id: randomUUID(), + customer_id: `customer-${(index % 100).toString().padStart(3, '0')}`, + amount: (Math.random() * 1000).toFixed(2), + status: ['pending', 'confirmed', 'shipped', 'delivered'][index % 4]!, + created_at: new Date().toISOString(), + } +} + +export async function runDirectBatchLoadTest(options: BatchLoadTestOptions): Promise { + const { rate, duration, batchSize, consumerBatchSize, consumerBatchTimeoutMs } = options + + console.log( + `Starting direct Kafka batch load test: ${rate} msgs/sec, ${duration}s duration, publish batch=${batchSize}, consumer batch=${consumerBatchSize}, timeout=${consumerBatchTimeoutMs}ms`, + ) + + const metrics = new MetricsCollector() + const publisher = new DirectPublisher() + const consumer = new DirectBatchConsumer(metrics, consumerBatchSize, consumerBatchTimeoutMs) + + console.log('Initializing Kafka batch consumer and publisher...') + await Promise.all([consumer.init(), publisher.init()]) + console.log('Batch consumer and publisher ready.') + + const reportInterval = setInterval(() => metrics.report(), config.reportIntervalMs) + + const totalMessages = rate * duration + + console.log(`Publishing ${totalMessages.toLocaleString()} total messages at ${rate}/sec`) + + const loadStartTime = Date.now() + let totalPublished = 0 + + while (totalPublished < totalMessages) { + const remaining = totalMessages - totalPublished + const currentBatch = Math.min(batchSize, remaining) + const eventCount = Math.ceil(currentBatch / 2) + const orderCount = currentBatch - eventCount + + try { + const promises: Promise[] = [] + for (let i = 0; i < eventCount; i++) { + promises.push(publisher.publish('direct-events', generateEvent(totalPublished + i))) + } + for (let i = 0; i < orderCount; i++) { + promises.push( + publisher.publish('direct-orders', generateOrder(totalPublished + eventCount + i)), + ) + } + await Promise.all(promises) + totalPublished += currentBatch + metrics.recordProduced(currentBatch) + } catch (err) { + console.error('Publish error:', err) + } + + if (rate > 0) { + const elapsed = Date.now() - loadStartTime + const expectedElapsed = (totalPublished / rate) * 1000 + const sleepMs = expectedElapsed - elapsed + if (sleepMs > 0) { + await setTimeout(sleepMs) + } + } + } + + console.log(`\nPublishing complete. ${totalPublished.toLocaleString()} messages published.`) + console.log(`Waiting up to ${config.drainTimeoutMs / 1000}s for consumer to drain...`) + + const drainStart = Date.now() + while (metrics.backlog > 0 && Date.now() - drainStart < config.drainTimeoutMs) { + await setTimeout(500) + } + + clearInterval(reportInterval) + metrics.printFinalReport() + + console.log('Shutting down...') + await Promise.all([consumer.close(), publisher.close()]) + console.log('Done.') +} diff --git a/packages/kafka/load-tests/src/direct-consumer.ts b/packages/kafka/load-tests/src/direct-consumer.ts new file mode 100644 index 00000000..b43f27e5 --- /dev/null +++ b/packages/kafka/load-tests/src/direct-consumer.ts @@ -0,0 +1,78 @@ +import { randomUUID } from 'node:crypto' +import { globalLogger } from '@lokalise/node-core' +import type { KafkaConsumerDependencies } from '@message-queue-toolkit/kafka' +import { + AbstractKafkaConsumer, + KafkaHandlerConfig, + KafkaHandlerRoutingBuilder, +} from '@message-queue-toolkit/kafka' +import { config } from './config.ts' +import { + DIRECT_EVENT_SCHEMA, + DIRECT_ORDER_SCHEMA, + type DIRECT_TOPICS_CONFIG, + type DirectEvent, +} from './direct-schemas.ts' +import type { MetricsCollector } from './metrics-collector.ts' + +type ExecutionContext = { + metrics: MetricsCollector +} + +export class DirectConsumer extends AbstractKafkaConsumer< + typeof DIRECT_TOPICS_CONFIG, + ExecutionContext +> { + constructor(metrics: MetricsCollector) { + const deps: KafkaConsumerDependencies = { + logger: globalLogger, + errorReporter: { report: () => {} }, + transactionObservabilityManager: { + start: () => {}, + stop: () => {}, + startWithGroup: () => {}, + addCustomAttributes: () => {}, + }, + } + + super( + deps, + { + kafka: { + bootstrapBrokers: config.kafka.bootstrapBrokers, + clientId: `direct-consumer-${randomUUID()}`, + }, + groupId: `direct-load-test-${randomUUID()}`, + batchProcessingEnabled: false, + handlers: new KafkaHandlerRoutingBuilder< + typeof DIRECT_TOPICS_CONFIG, + ExecutionContext, + false + >() + .addConfig( + 'direct-events', + new KafkaHandlerConfig(DIRECT_EVENT_SCHEMA, (message, ctx) => { + const value = message.value as DirectEvent + const loadtestTs = + typeof value.payload.loadtest_ts === 'number' + ? value.payload.loadtest_ts + : undefined + ctx.metrics.recordConsumed('direct-events', loadtestTs) + }), + ) + .addConfig( + 'direct-orders', + new KafkaHandlerConfig(DIRECT_ORDER_SCHEMA, (message, ctx) => { + ctx.metrics.recordConsumed('direct-orders') + }), + ) + .build(), + autocreateTopics: true, + logMessages: false, + handlerSpy: false, + maxWaitTime: 5, + }, + { metrics }, + ) + } +} diff --git a/packages/kafka/load-tests/src/direct-load-generator.ts b/packages/kafka/load-tests/src/direct-load-generator.ts new file mode 100644 index 00000000..9bfd5520 --- /dev/null +++ b/packages/kafka/load-tests/src/direct-load-generator.ts @@ -0,0 +1,112 @@ +import { randomUUID } from 'node:crypto' +import { setTimeout } from 'node:timers/promises' +import { config } from './config.ts' +import { DirectConsumer } from './direct-consumer.ts' +import { DirectPublisher } from './direct-publisher.ts' +import type { DirectEvent, DirectOrder } from './direct-schemas.ts' +import { MetricsCollector } from './metrics-collector.ts' + +export interface LoadTestOptions { + rate: number + duration: number + batchSize: number +} + +function generateEvent(index: number): DirectEvent { + return { + id: randomUUID(), + event_type: `load_test_${index % 5}`, + payload: { loadtest_ts: Date.now(), index, data: `event-payload-${index}` }, + created_at: new Date().toISOString(), + } +} + +function generateOrder(index: number): DirectOrder { + return { + id: randomUUID(), + customer_id: `customer-${(index % 100).toString().padStart(3, '0')}`, + amount: (Math.random() * 1000).toFixed(2), + status: ['pending', 'confirmed', 'shipped', 'delivered'][index % 4]!, + created_at: new Date().toISOString(), + } +} + +export async function runDirectLoadTest(options: LoadTestOptions): Promise { + const { rate, duration, batchSize } = options + + console.log( + `Starting direct Kafka load test: ${rate} msgs/sec, ${duration}s duration, batch=${batchSize}`, + ) + + const metrics = new MetricsCollector() + const publisher = new DirectPublisher() + const consumer = new DirectConsumer(metrics) + + // Start consumer and publisher + console.log('Initializing Kafka consumer and publisher...') + await Promise.all([consumer.init(), publisher.init()]) + console.log('Consumer and publisher ready.') + + // Periodic reporting + const reportInterval = setInterval(() => metrics.report(), config.reportIntervalMs) + + // Generate load + const totalMessages = rate * duration + + console.log(`Publishing ${totalMessages.toLocaleString()} total messages at ${rate}/sec`) + + const loadStartTime = Date.now() + let totalPublished = 0 + + while (totalPublished < totalMessages) { + const remaining = totalMessages - totalPublished + const currentBatch = Math.min(batchSize, remaining) + + // Split evenly between events and orders + const eventCount = Math.ceil(currentBatch / 2) + const orderCount = currentBatch - eventCount + + try { + const promises: Promise[] = [] + for (let i = 0; i < eventCount; i++) { + promises.push(publisher.publish('direct-events', generateEvent(totalPublished + i))) + } + for (let i = 0; i < orderCount; i++) { + promises.push( + publisher.publish('direct-orders', generateOrder(totalPublished + eventCount + i)), + ) + } + await Promise.all(promises) + totalPublished += currentBatch + metrics.recordProduced(currentBatch) + } catch (err) { + console.error('Publish error:', err) + } + + // Throttle to target rate + const elapsed = Date.now() - loadStartTime + const expectedElapsed = (totalPublished / rate) * 1000 + const sleepMs = expectedElapsed - elapsed + if (sleepMs > 0) { + await setTimeout(sleepMs) + } + } + + console.log(`\nPublishing complete. ${totalPublished.toLocaleString()} messages published.`) + console.log(`Waiting up to ${config.drainTimeoutMs / 1000}s for consumer to drain...`) + + // Wait for consumer to drain + const drainStart = Date.now() + while (metrics.backlog > 0 && Date.now() - drainStart < config.drainTimeoutMs) { + await setTimeout(500) + } + + // Final report + clearInterval(reportInterval) + metrics.printFinalReport() + + // Cleanup + console.log('Shutting down...') + await Promise.all([consumer.close(), publisher.close()]) + console.log('Done.') +} diff --git a/packages/kafka/load-tests/src/direct-publisher.ts b/packages/kafka/load-tests/src/direct-publisher.ts new file mode 100644 index 00000000..bd056ecb --- /dev/null +++ b/packages/kafka/load-tests/src/direct-publisher.ts @@ -0,0 +1,25 @@ +import { randomUUID } from 'node:crypto' +import { globalLogger } from '@lokalise/node-core' +import { AbstractKafkaPublisher, type KafkaDependencies } from '@message-queue-toolkit/kafka' +import { config } from './config.ts' +import { DIRECT_TOPICS_CONFIG } from './direct-schemas.ts' + +export class DirectPublisher extends AbstractKafkaPublisher { + constructor() { + const deps: KafkaDependencies = { + logger: globalLogger, + errorReporter: { report: () => {} }, + } + + super(deps, { + kafka: { + bootstrapBrokers: config.kafka.bootstrapBrokers, + clientId: `direct-publisher-${randomUUID()}`, + }, + topicsConfig: DIRECT_TOPICS_CONFIG, + autocreateTopics: true, + logMessages: false, + handlerSpy: false, + }) + } +} diff --git a/packages/kafka/load-tests/src/direct-schemas.ts b/packages/kafka/load-tests/src/direct-schemas.ts new file mode 100644 index 00000000..811f331b --- /dev/null +++ b/packages/kafka/load-tests/src/direct-schemas.ts @@ -0,0 +1,24 @@ +import type { TopicConfig } from '@message-queue-toolkit/kafka' +import z from 'zod/v4' + +export const DIRECT_EVENT_SCHEMA = z.object({ + id: z.string(), + event_type: z.string(), + payload: z.record(z.string(), z.unknown()), + created_at: z.string(), +}) +export type DirectEvent = z.output + +export const DIRECT_ORDER_SCHEMA = z.object({ + id: z.string(), + customer_id: z.string(), + amount: z.string(), + status: z.string(), + created_at: z.string(), +}) +export type DirectOrder = z.output + +export const DIRECT_TOPICS_CONFIG = [ + { topic: 'direct-events', schema: DIRECT_EVENT_SCHEMA }, + { topic: 'direct-orders', schema: DIRECT_ORDER_SCHEMA }, +] as const satisfies TopicConfig[] diff --git a/packages/kafka/load-tests/src/load-generator.ts b/packages/kafka/load-tests/src/load-generator.ts new file mode 100644 index 00000000..b6d1576c --- /dev/null +++ b/packages/kafka/load-tests/src/load-generator.ts @@ -0,0 +1,99 @@ +import { setTimeout } from 'node:timers/promises' +import { CdcConsumer } from './cdc-consumer.ts' +import { config } from './config.ts' +import { CrdbClient } from './crdb-client.ts' +import { MetricsCollector } from './metrics-collector.ts' + +export interface LoadTestOptions { + rate: number + duration: number + batchSize: number +} + +export async function runLoadTest(options: LoadTestOptions): Promise { + const { rate, duration, batchSize } = options + + console.log(`Starting CDC load test: ${rate} rows/sec, ${duration}s duration, batch=${batchSize}`) + + const metrics = new MetricsCollector() + const crdb = new CrdbClient() + const consumer = new CdcConsumer(metrics) + + // Start consumer + console.log('Initializing Kafka consumer...') + await consumer.init() + console.log('Consumer ready.') + + // Periodic reporting + const reportInterval = setInterval(() => metrics.report(), config.reportIntervalMs) + + // Generate load + const totalRows = rate * duration + const batchesPerSecond = rate / batchSize + const delayMs = batchesPerSecond > 0 ? 1000 / batchesPerSecond : 1000 + + console.log( + `Generating ${totalRows.toLocaleString()} total rows (${batchesPerSecond.toFixed(1)} batches/sec, ${delayMs.toFixed(0)}ms between batches)`, + ) + + const loadStartTime = Date.now() + let totalInserted = 0 + let inflight = 0 + const maxInflight = 10 // cap concurrent batch inserts + + while (totalInserted < totalRows) { + // Wait if too many batches in flight + while (inflight >= maxInflight) { + await setTimeout(5) + } + + const remaining = totalRows - totalInserted + const currentBatch = Math.min(batchSize, remaining) + const eventCount = Math.ceil(currentBatch / 2) + const orderCount = currentBatch - eventCount + + totalInserted += currentBatch + inflight++ + + Promise.all([crdb.insertEvents(eventCount), crdb.insertOrders(orderCount)]) + .then(() => metrics.recordProduced(currentBatch)) + .catch((err) => console.error('Insert error:', err)) + .finally(() => { + inflight-- + }) + + // Throttle to target rate (0 = no throttle = max speed) + if (rate > 0) { + const elapsed = Date.now() - loadStartTime + const expectedElapsed = (totalInserted / rate) * 1000 + const sleepMs = expectedElapsed - elapsed + if (sleepMs > 0) { + await setTimeout(sleepMs) + } + } + } + + // Wait for all in-flight inserts to complete + while (inflight > 0) { + await setTimeout(10) + } + + console.log(`\nLoad generation complete. ${totalInserted.toLocaleString()} rows inserted.`) + console.log(`Waiting up to ${config.drainTimeoutMs / 1000}s for consumer to drain...`) + + // Wait for consumer to drain + const drainStart = Date.now() + while (metrics.backlog > 0 && Date.now() - drainStart < config.drainTimeoutMs) { + await setTimeout(500) + } + + // Final report + clearInterval(reportInterval) + metrics.printFinalReport() + + // Cleanup + console.log('Shutting down...') + await consumer.close() + await crdb.close() + console.log('Done.') +} diff --git a/packages/kafka/load-tests/src/metrics-collector.ts b/packages/kafka/load-tests/src/metrics-collector.ts new file mode 100644 index 00000000..a1711a8a --- /dev/null +++ b/packages/kafka/load-tests/src/metrics-collector.ts @@ -0,0 +1,107 @@ +export class MetricsCollector { + private producedTotal = 0 + private consumedTotal = 0 + private windowProduced = 0 + private windowConsumed = 0 + private readonly perTopicConsumed = new Map() + private readonly latencies: number[] = [] + + private readonly startTime = Date.now() + private lastReportTime = Date.now() + + recordProduced(count: number): void { + this.producedTotal += count + this.windowProduced += count + } + + recordConsumed(topic: string, loadtestTs?: number): void { + this.consumedTotal++ + this.windowConsumed++ + this.perTopicConsumed.set(topic, (this.perTopicConsumed.get(topic) ?? 0) + 1) + + if (loadtestTs) { + this.latencies.push(Date.now() - loadtestTs) + } + } + + get backlog(): number { + return this.producedTotal - this.consumedTotal + } + + get totalConsumed(): number { + return this.consumedTotal + } + + get totalProduced(): number { + return this.producedTotal + } + + report(): void { + const now = Date.now() + const windowSec = (now - this.lastReportTime) / 1000 + const elapsedSec = Math.round((now - this.startTime) / 1000) + + const producedRate = windowSec > 0 ? Math.round(this.windowProduced / windowSec) : 0 + const consumedRate = windowSec > 0 ? Math.round(this.windowConsumed / windowSec) : 0 + + const latencyStats = this.computeLatencyStats() + + const topicBreakdown = [...this.perTopicConsumed.entries()] + .map(([t, c]) => `${t}=${c.toLocaleString()}`) + .join(' | ') + + console.log(` +=== Load Test Report (t=${elapsedSec}s) === + Produced: ${this.producedTotal.toLocaleString()} total | ${producedRate}/sec + Consumed: ${this.consumedTotal.toLocaleString()} total | ${consumedRate}/sec + Backlog: ${this.backlog.toLocaleString()} messages + Latency: avg=${latencyStats.avg}ms | p50=${latencyStats.p50}ms | p95=${latencyStats.p95}ms | p99=${latencyStats.p99}ms + Per Topic: ${topicBreakdown || 'n/a'} +================================ +`) + + this.windowProduced = 0 + this.windowConsumed = 0 + this.lastReportTime = now + } + + printFinalReport(): void { + const elapsedSec = (Date.now() - this.startTime) / 1000 + + const latencyStats = this.computeLatencyStats() + + const topicBreakdown = [...this.perTopicConsumed.entries()] + .map(([t, c]) => `${t}=${c.toLocaleString()}`) + .join(' | ') + + console.log(` +╔══════════════════════════════════════╗ +║ FINAL LOAD TEST REPORT ║ +╠══════════════════════════════════════╣ + Duration: ${elapsedSec.toFixed(1)}s + Produced: ${this.producedTotal.toLocaleString()} total | ${Math.round(this.producedTotal / elapsedSec)}/sec avg + Consumed: ${this.consumedTotal.toLocaleString()} total | ${Math.round(this.consumedTotal / elapsedSec)}/sec avg + Backlog: ${this.backlog.toLocaleString()} messages + Latency: avg=${latencyStats.avg}ms | p50=${latencyStats.p50}ms | p95=${latencyStats.p95}ms | p99=${latencyStats.p99}ms + Per Topic: ${topicBreakdown || 'n/a'} + Samples: ${this.latencies.length.toLocaleString()} latency measurements +╚══════════════════════════════════════╝ +`) + } + + private computeLatencyStats(): { avg: number; p50: number; p95: number; p99: number } { + if (this.latencies.length === 0) { + return { avg: 0, p50: 0, p95: 0, p99: 0 } + } + + const sorted = [...this.latencies].sort((a, b) => a - b) + const sum = sorted.reduce((s, v) => s + v, 0) + + return { + avg: Math.round(sum / sorted.length), + p50: sorted[Math.floor(sorted.length * 0.5)]!, + p95: sorted[Math.floor(sorted.length * 0.95)]!, + p99: sorted[Math.floor(sorted.length * 0.99)]!, + } + } +} diff --git a/packages/kafka/load-tests/src/run-batch.ts b/packages/kafka/load-tests/src/run-batch.ts new file mode 100644 index 00000000..67ab7bb4 --- /dev/null +++ b/packages/kafka/load-tests/src/run-batch.ts @@ -0,0 +1,21 @@ +import { parseArgs } from 'node:util' +import { runBatchLoadTest } from './batch-load-generator.ts' + +const { values } = parseArgs({ + options: { + rate: { type: 'string', short: 'r', default: '1000' }, + duration: { type: 'string', short: 'd', default: '60' }, + batch: { type: 'string', short: 'b', default: '100' }, + 'consumer-batch': { type: 'string', default: '50' }, + 'consumer-timeout': { type: 'string', default: '200' }, + }, + strict: true, +}) + +await runBatchLoadTest({ + rate: Number.parseInt(values.rate!, 10), + duration: Number.parseInt(values.duration!, 10), + batchSize: Number.parseInt(values.batch!, 10), + consumerBatchSize: Number.parseInt(values['consumer-batch']!, 10), + consumerBatchTimeoutMs: Number.parseInt(values['consumer-timeout']!, 10), +}) diff --git a/packages/kafka/load-tests/src/run-direct-batch.ts b/packages/kafka/load-tests/src/run-direct-batch.ts new file mode 100644 index 00000000..4d54cd0b --- /dev/null +++ b/packages/kafka/load-tests/src/run-direct-batch.ts @@ -0,0 +1,21 @@ +import { parseArgs } from 'node:util' +import { runDirectBatchLoadTest } from './direct-batch-load-generator.ts' + +const { values } = parseArgs({ + options: { + rate: { type: 'string', short: 'r', default: '1000' }, + duration: { type: 'string', short: 'd', default: '60' }, + batch: { type: 'string', short: 'b', default: '100' }, + 'consumer-batch': { type: 'string', default: '50' }, + 'consumer-timeout': { type: 'string', default: '200' }, + }, + strict: true, +}) + +await runDirectBatchLoadTest({ + rate: Number.parseInt(values.rate!, 10), + duration: Number.parseInt(values.duration!, 10), + batchSize: Number.parseInt(values.batch!, 10), + consumerBatchSize: Number.parseInt(values['consumer-batch']!, 10), + consumerBatchTimeoutMs: Number.parseInt(values['consumer-timeout']!, 10), +}) diff --git a/packages/kafka/load-tests/src/run-direct.ts b/packages/kafka/load-tests/src/run-direct.ts new file mode 100644 index 00000000..3c81f494 --- /dev/null +++ b/packages/kafka/load-tests/src/run-direct.ts @@ -0,0 +1,17 @@ +import { parseArgs } from 'node:util' +import { runDirectLoadTest } from './direct-load-generator.ts' + +const { values } = parseArgs({ + options: { + rate: { type: 'string', short: 'r', default: '1000' }, + duration: { type: 'string', short: 'd', default: '60' }, + batch: { type: 'string', short: 'b', default: '100' }, + }, + strict: true, +}) + +await runDirectLoadTest({ + rate: Number.parseInt(values.rate!, 10), + duration: Number.parseInt(values.duration!, 10), + batchSize: Number.parseInt(values.batch!, 10), +}) diff --git a/packages/kafka/load-tests/src/run.ts b/packages/kafka/load-tests/src/run.ts new file mode 100644 index 00000000..25a4cdd1 --- /dev/null +++ b/packages/kafka/load-tests/src/run.ts @@ -0,0 +1,17 @@ +import { parseArgs } from 'node:util' +import { runLoadTest } from './load-generator.ts' + +const { values } = parseArgs({ + options: { + rate: { type: 'string', short: 'r', default: '1000' }, + duration: { type: 'string', short: 'd', default: '60' }, + batch: { type: 'string', short: 'b', default: '100' }, + }, + strict: true, +}) + +await runLoadTest({ + rate: Number.parseInt(values.rate!, 10), + duration: Number.parseInt(values.duration!, 10), + batchSize: Number.parseInt(values.batch!, 10), +}) diff --git a/packages/kafka/load-tests/tsconfig.json b/packages/kafka/load-tests/tsconfig.json new file mode 100644 index 00000000..a3b04095 --- /dev/null +++ b/packages/kafka/load-tests/tsconfig.json @@ -0,0 +1,17 @@ +{ + "compilerOptions": { + "target": "ES2022", + "module": "NodeNext", + "moduleResolution": "NodeNext", + "strict": true, + "esModuleInterop": true, + "skipLibCheck": true, + "allowImportingTsExtensions": true, + "rewriteRelativeImportExtensions": true, + "forceConsistentCasingInFileNames": true, + "resolveJsonModule": true, + "declaration": false, + "outDir": "dist" + }, + "include": ["src"] +}