Skip to content

Refactor benchmark dispatch from per-tenant to orchestrator-level using Flux concurrency#48998

Open
xinlian12 wants to merge 7 commits intoAzure:mainfrom
xinlian12:feat/benchmark-orchestrator-dispatch
Open

Refactor benchmark dispatch from per-tenant to orchestrator-level using Flux concurrency#48998
xinlian12 wants to merge 7 commits intoAzure:mainfrom
xinlian12:feat/benchmark-orchestrator-dispatch

Conversation

@xinlian12
Copy link
Copy Markdown
Member

@xinlian12 xinlian12 commented Apr 30, 2026

Problem

Currently each tenant/benchmark independently controls its own concurrency and max operation count (via per-tenant Flux/ExecutorService). As more tenants are added, maintaining concurrency config across all of them becomes painful and error-prone.

Solution

Centralize dispatch at the orchestrator level: a single Flux.generate().flatMap(benchmark.next(), globalConcurrency) drives all operations, randomly selecting which tenant executes each slot.

Architecture

BEFORE:  Orchestrator -> ExecutorService (1 thread/tenant) -> each benchmark.run() with per-tenant concurrency
AFTER:   Orchestrator -> Flux.generate(totalOps).flatMap(randomBenchmark.next(), globalConcurrency)

Changes

Core Design

  • Benchmark.java - Interface now has Mono<Void> next() (execute one operation) + shutdown(). Removed run() and supportsOrchestratorDispatch().
  • BenchmarkOrchestrator.java - runWorkload() uses a single Flux with ThreadLocalRandom tenant selection and doFinally for operation counting. Removed ExecutorService.
  • BenchmarkConfig.java - concurrency, numberOfOperations, and maxRunningTimeDuration are now required top-level fields in the workload config JSON. Validates concurrency > 0.

Benchmark Adaptations

  • AsyncBenchmark.java - next() delegates to performWorkload() via AtomicLong counter. Removed per-benchmark Scheduler.
  • SyncBenchmark.java - next() wraps blocking performWorkload() in Mono.fromCallable().subscribeOn(Schedulers.boundedElastic()). Removed ExecutorService, Semaphore, ResultHandler.
  • AsyncEncryptionBenchmark.java / AsyncCtlWorkload.java - Same pattern, removed scheduler.
  • LICtlWorkload.java - next() wraps full lifecycle in Mono.fromRunnable().

Config Format

{
  "concurrency": 100,
  "numberOfOperations": 500000,
  "tenantDefaults": { ... },
  "tenants": [ ... ]
}

Testing

  • mvn compile test-compile -pl . -am -DskipTests passes clean
  • Updated WorkflowTest and ReadMyWritesConsistencyTest for new constructor signatures

…ng Flux concurrency

- Add Mono<Void> next() method to Benchmark interface for orchestrator-level dispatch
- Add supportsOrchestratorDispatch() to Benchmark interface for dispatch routing
- Implement next() in AsyncBenchmark, SyncBenchmark, AsyncEncryptionBenchmark, AsyncCtlWorkload
- SyncBenchmark.next() wraps blocking performWorkload() in Mono.fromCallable + Schedulers.boundedElastic()
- Rewrite BenchmarkOrchestrator.runWorkload() to use Flux.flatMap(fn, globalConcurrency)
  with random tenant selection instead of per-tenant ExecutorService dispatch
- Add orchestrator-level concurrency/numberOfOperations/maxRunningTimeDuration to BenchmarkConfig
  loaded from new 'orchestrator' section in workload config JSON
- Fall back to legacy per-tenant run() dispatch for benchmarks that don't support
  orchestrator dispatch (e.g., LICtlWorkload)
- Maintain full backward compatibility: existing workload config files work unchanged
  (orchestrator config falls back to sum of tenant concurrencies/operations)
- Constructors, shutdown(), and data pre-population are unchanged
Copilot AI review requested due to automatic review settings April 30, 2026 19:58
@xinlian12 xinlian12 requested review from a team and kirankumarkolli as code owners April 30, 2026 19:58
@xinlian12 xinlian12 changed the title Refactor benchmark dispatch from per-tenant to orchestrator-level using Flux concurrency [NO REVIEW]Refactor benchmark dispatch from per-tenant to orchestrator-level using Flux concurrency Apr 30, 2026
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Note

Copilot was unable to run its full agentic suite in this review.

Refactors benchmark execution so the orchestrator centrally controls concurrency and operation dispatch using Reactor Flux/Mono, while retaining a legacy per-benchmark run() path for benchmarks that can’t participate.

Changes:

  • Introduces Benchmark.next() + supportsOrchestratorDispatch() to enable orchestrator-driven, per-operation dispatch.
  • Reworks BenchmarkOrchestrator.runWorkload() to partition benchmarks and drive orchestrator-capable ones via Flux.generate(...).flatMap(..., globalConcurrency).
  • Adds orchestrator-level config (concurrency, numberOfOperations, maxRunningTimeDuration) with fallbacks, and updates the sample workload JSON.

Reviewed changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
sdk/cosmos/azure-cosmos-benchmark/workload-config-sample.json Adds example orchestrator section for centralized concurrency/ops.
sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/Benchmark.java Adds next() + capability flag for orchestrator dispatch.
sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncBenchmark.java Implements next() for async benchmarks using an internal counter.
sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/SyncBenchmark.java Implements next() for sync benchmarks via Mono.fromCallable(...).boundedElastic().
sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/encryption/AsyncEncryptionBenchmark.java Implements next() for encryption benchmark similarly to AsyncBenchmark.
sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/ctl/AsyncCtlWorkload.java Implements next() to allow orchestrator dispatch for CTL async workload.
sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/BenchmarkConfig.java Adds orchestrator-level config parsing + fallback getters.
sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/BenchmarkOrchestrator.java Centralizes concurrency and operation dispatch via Flux; keeps legacy path for unsupported benchmarks.

- Remove run() from Benchmark interface, all dispatch goes through next()
- Remove supportsOrchestratorDispatch() routing and legacy fallback path
- Remove ExecutorService from BenchmarkOrchestrator and SyncBenchmark
- Remove Semaphore and ResultHandler from SyncBenchmark
- Remove per-tenant Flux dispatch loops from AsyncBenchmark, AsyncEncryptionBenchmark, AsyncCtlWorkload
- Make orchestrator config section required (concurrency + numberOfOperations/maxRunningTimeDuration)
- Remove fallback-to-tenant-sum logic in BenchmarkConfig getters

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…LocalRandom/doFinally

- Validate concurrency > 0 at parse time
- Use doFinally instead of doOnSuccess/doOnError for completedCount
- Use ThreadLocalRandom.current() instead of shared Random
- Fix fully-qualified AtomicLong in AsyncCtlWorkload
- Remove per-benchmark Scheduler - orchestrator Flux handles scheduling
- Remove benchmarkScheduler field/param from all async benchmarks
- Flatten orchestrator config: concurrency/numberOfOperations/maxRunningTimeDuration at top level
- Use Duration import instead of fully-qualified java.time.Duration
- Fix LICtlWorkload: wrap run() in Mono.fromRunnable for next()
- Update tests to use new constructor signatures and next().block()

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@xinlian12 xinlian12 changed the title [NO REVIEW]Refactor benchmark dispatch from per-tenant to orchestrator-level using Flux concurrency Refactor benchmark dispatch from per-tenant to orchestrator-level using Flux concurrency Apr 30, 2026
@xinlian12
Copy link
Copy Markdown
Member Author

@sdkReviewAgent

@xinlian12
Copy link
Copy Markdown
Member Author

@sdkReviewAgent-2

Annie Liang and others added 2 commits April 30, 2026 14:51
… tests

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
LICtlWorkload's lifecycle model (resource creation + data loading + test execution
in a single run() call) is incompatible with the new per-operation next() dispatch.
Rather than maintaining a complex compatibility shim, remove it entirely.
The linkedin/ package was exclusively used by LICtlWorkload.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Comment thread sdk/cosmos/azure-cosmos-benchmark/workload-config-sample.json
@xinlian12
Copy link
Copy Markdown
Member Author

Review complete (43:40)

Posted 5 inline comment(s).

Steps: ✓ context, correctness, cross-sdk, design, history, past-prs, synthesis, test-coverage

…docs

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Dispatch concurrency is orchestrator-level only. Per-tenant concurrency
was only used for bulk ingestion error retry paths  a fixed default
of 5 is sufficient.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants