Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion biome.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"@lokalise/biome-config/configs/biome-package.jsonc"
],
"files": {
"includes": ["**", "!.turbo", "!**/dist", "!**/coverage"]
"includes": ["**", "!.turbo", "!**/dist", "!**/coverage", "!**/load-tests"]
},
"linter": {
"rules": {
Expand Down
1 change: 1 addition & 0 deletions packages/kafka/load-tests/.gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
scripts/*.sh text eol=lf
114 changes: 114 additions & 0 deletions packages/kafka/load-tests/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# Kafka Load Tests

Load tests for `@message-queue-toolkit/kafka` measuring throughput, latency, and backlog under configurable load.

Two producer modes and two consumer modes can be combined:

- **CDC**: CockroachDB inserts → CDC changefeed → Kafka (realistic end-to-end)
- **Direct**: `AbstractKafkaPublisher` → Kafka (isolates Kafka consumer performance)
- **Single**: One message at a time (`batchProcessingEnabled: false`)
- **Batch**: Batched consumption via `KafkaMessageBatchStream` (`batchProcessingEnabled: true`)

## Prerequisites

- Docker & Docker Compose
- Node.js >= 22.14.0

## Quick Start

```bash
# Install dependencies
npm install

# Start infrastructure (Kafka + CockroachDB + CDC changefeed)
npm run docker:start

# Stop and clean up
npm run docker:stop
```

## Test Scripts

### CDC (CockroachDB → CDC → Kafka)

```bash
# Single-message consumer
npm run load:cdc:light # 100 rows/sec, 30s
npm run load:cdc:medium # 1000 rows/sec, 60s
npm run load:cdc:heavy # 5000 rows/sec, 120s
npm run load:cdc -- --rate 500 --duration 45 --batch 50

# Batch consumer
npm run load:cdc:batch:light # 100 rows/sec, 30s
npm run load:cdc:batch:medium # 1000 rows/sec, 60s
npm run load:cdc:batch:heavy # 5000 rows/sec, 120s
npm run load:cdc:batch -- --rate 500 --consumer-batch 100 --consumer-timeout 500
```

### Direct (Kafka publisher → Kafka)

```bash
# Single-message consumer
npm run load:direct:light # 100 msgs/sec, 30s
npm run load:direct:medium # 1000 msgs/sec, 60s
npm run load:direct:heavy # 5000 msgs/sec, 120s
npm run load:direct -- --rate 500 --duration 45 --batch 50

# Batch consumer
npm run load:direct:batch:light # 100 msgs/sec, 30s
npm run load:direct:batch:medium # 1000 msgs/sec, 60s
npm run load:direct:batch:heavy # 5000 msgs/sec, 120s
npm run load:direct:batch -- --rate 500 --consumer-batch 100 --consumer-timeout 500
```

## CLI Options

### Common

| Flag | Short | Default | Description |
|------|-------|---------|-------------|
| `--rate` | `-r` | 1000 | Target produce rate (rows or msgs per sec) |
| `--duration` | `-d` | 60 | Test duration (seconds) |
| `--batch` | `-b` | 100 | Producer batch size |

### Batch consumer only

| Flag | Default | Description |
|------|---------|-------------|
| `--consumer-batch` | 50 | Messages per consumer batch |
| `--consumer-timeout` | 200 | Batch flush timeout (ms) |

## Architecture

### CDC mode

```
Load Generator → CockroachDB (inserts) → CDC Changefeed → Kafka → Consumer → Metrics
```

1. **CockroachDB** tables (`events`, `orders`) with CDC changefeed targeting Kafka
2. **Load generator** inserts rows into CRDB at configurable rate with fire-and-forget concurrency
3. **CDC changefeed** publishes row changes to Kafka topics
4. **Consumer** (single or batch) processes messages and records metrics

### Direct mode

```
Load Generator → AbstractKafkaPublisher → Kafka → Consumer → Metrics
```

1. **Publisher** sends messages directly to Kafka topics (`direct-events`, `direct-orders`)
2. **Consumer** (single or batch) processes messages and records metrics

## Services

| Service | Port | Description |
|---------|------|-------------|
| Kafka | 9092 | Message broker (KRaft, 6 partitions) |
| CockroachDB | 26257 | SQL database |
| CockroachDB UI | 8181 | DB admin console |
| Kafka UI | 8080 | Topic browser |

## Latency Measurement

Each `events` row embeds `{"loadtest_ts": <epoch_ms>}` in its payload. The consumer extracts this timestamp and computes end-to-end latency (insert/publish → consume). Reported as avg, p50, p95, p99.
67 changes: 67 additions & 0 deletions packages/kafka/load-tests/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
services:

kafka:
image: apache/kafka:3.7.1
ports:
- 9092:9092
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: LOCAL://0.0.0.0:9092,DOCKER://kafka:9093,CONTROLLER://localhost:9094
KAFKA_ADVERTISED_LISTENERS: LOCAL://localhost:9092,DOCKER://kafka:9093
KAFKA_INTER_BROKER_LISTENER_NAME: LOCAL
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,LOCAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9094
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_NUM_PARTITIONS: 6
healthcheck:
test: /opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
interval: 5s
timeout: 10s
retries: 10
start_period: 15s
restart: on-failure

cockroachdb:
image: cockroachdb/cockroach:latest-v25.1
command: start-single-node --insecure
ports:
- 26257:26257
- 8181:8080
depends_on:
kafka:
condition: service_healthy
healthcheck:
test: curl -f http://localhost:8080/health?ready=1 || exit 1
interval: 5s
timeout: 5s
retries: 10
start_period: 10s
restart: on-failure

crdb-init:
image: cockroachdb/cockroach:latest-v25.1
volumes:
- ./scripts/init-crdb.sh:/init-crdb.sh
entrypoint: ["/bin/bash", "/init-crdb.sh"]
depends_on:
cockroachdb:
condition: service_healthy
restart: "no"

kafka-ui:
image: provectuslabs/kafka-ui:latest
ports:
- 8080:8080
depends_on:
kafka:
condition: service_healthy
environment:
DYNAMIC_CONFIG_ENABLED: 'true'
KAFKA_CLUSTERS_0_NAME: LoadTest
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9093
restart: on-failure
Loading
Loading