Refactor benchmark dispatch from per-tenant to orchestrator-level using Flux concurrency#48998
Open
xinlian12 wants to merge 7 commits intoAzure:mainfrom
Open
Refactor benchmark dispatch from per-tenant to orchestrator-level using Flux concurrency#48998xinlian12 wants to merge 7 commits intoAzure:mainfrom
xinlian12 wants to merge 7 commits intoAzure:mainfrom
Conversation
…ng Flux concurrency - Add Mono<Void> next() method to Benchmark interface for orchestrator-level dispatch - Add supportsOrchestratorDispatch() to Benchmark interface for dispatch routing - Implement next() in AsyncBenchmark, SyncBenchmark, AsyncEncryptionBenchmark, AsyncCtlWorkload - SyncBenchmark.next() wraps blocking performWorkload() in Mono.fromCallable + Schedulers.boundedElastic() - Rewrite BenchmarkOrchestrator.runWorkload() to use Flux.flatMap(fn, globalConcurrency) with random tenant selection instead of per-tenant ExecutorService dispatch - Add orchestrator-level concurrency/numberOfOperations/maxRunningTimeDuration to BenchmarkConfig loaded from new 'orchestrator' section in workload config JSON - Fall back to legacy per-tenant run() dispatch for benchmarks that don't support orchestrator dispatch (e.g., LICtlWorkload) - Maintain full backward compatibility: existing workload config files work unchanged (orchestrator config falls back to sum of tenant concurrencies/operations) - Constructors, shutdown(), and data pre-population are unchanged
Contributor
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Refactors benchmark execution so the orchestrator centrally controls concurrency and operation dispatch using Reactor Flux/Mono, while retaining a legacy per-benchmark run() path for benchmarks that can’t participate.
Changes:
- Introduces
Benchmark.next()+supportsOrchestratorDispatch()to enable orchestrator-driven, per-operation dispatch. - Reworks
BenchmarkOrchestrator.runWorkload()to partition benchmarks and drive orchestrator-capable ones viaFlux.generate(...).flatMap(..., globalConcurrency). - Adds orchestrator-level config (
concurrency,numberOfOperations,maxRunningTimeDuration) with fallbacks, and updates the sample workload JSON.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| sdk/cosmos/azure-cosmos-benchmark/workload-config-sample.json | Adds example orchestrator section for centralized concurrency/ops. |
| sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/Benchmark.java | Adds next() + capability flag for orchestrator dispatch. |
| sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncBenchmark.java | Implements next() for async benchmarks using an internal counter. |
| sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/SyncBenchmark.java | Implements next() for sync benchmarks via Mono.fromCallable(...).boundedElastic(). |
| sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/encryption/AsyncEncryptionBenchmark.java | Implements next() for encryption benchmark similarly to AsyncBenchmark. |
| sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/ctl/AsyncCtlWorkload.java | Implements next() to allow orchestrator dispatch for CTL async workload. |
| sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/BenchmarkConfig.java | Adds orchestrator-level config parsing + fallback getters. |
| sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/BenchmarkOrchestrator.java | Centralizes concurrency and operation dispatch via Flux; keeps legacy path for unsupported benchmarks. |
- Remove run() from Benchmark interface, all dispatch goes through next() - Remove supportsOrchestratorDispatch() routing and legacy fallback path - Remove ExecutorService from BenchmarkOrchestrator and SyncBenchmark - Remove Semaphore and ResultHandler from SyncBenchmark - Remove per-tenant Flux dispatch loops from AsyncBenchmark, AsyncEncryptionBenchmark, AsyncCtlWorkload - Make orchestrator config section required (concurrency + numberOfOperations/maxRunningTimeDuration) - Remove fallback-to-tenant-sum logic in BenchmarkConfig getters Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
xinlian12
commented
Apr 30, 2026
xinlian12
commented
Apr 30, 2026
xinlian12
commented
Apr 30, 2026
xinlian12
commented
Apr 30, 2026
xinlian12
commented
Apr 30, 2026
…LocalRandom/doFinally - Validate concurrency > 0 at parse time - Use doFinally instead of doOnSuccess/doOnError for completedCount - Use ThreadLocalRandom.current() instead of shared Random - Fix fully-qualified AtomicLong in AsyncCtlWorkload - Remove per-benchmark Scheduler - orchestrator Flux handles scheduling - Remove benchmarkScheduler field/param from all async benchmarks - Flatten orchestrator config: concurrency/numberOfOperations/maxRunningTimeDuration at top level - Use Duration import instead of fully-qualified java.time.Duration - Fix LICtlWorkload: wrap run() in Mono.fromRunnable for next() - Update tests to use new constructor signatures and next().block() Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
xinlian12
commented
Apr 30, 2026
Member
Author
|
@sdkReviewAgent |
xinlian12
commented
Apr 30, 2026
Member
Author
|
@sdkReviewAgent-2 |
… tests Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
LICtlWorkload's lifecycle model (resource creation + data loading + test execution in a single run() call) is incompatible with the new per-operation next() dispatch. Rather than maintaining a complex compatibility shim, remove it entirely. The linkedin/ package was exclusively used by LICtlWorkload. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
xinlian12
commented
Apr 30, 2026
xinlian12
commented
Apr 30, 2026
Member
Author
|
✅ Review complete (43:40) Posted 5 inline comment(s). Steps: ✓ context, correctness, cross-sdk, design, history, past-prs, synthesis, test-coverage |
…docs Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Dispatch concurrency is orchestrator-level only. Per-tenant concurrency was only used for bulk ingestion error retry paths a fixed default of 5 is sufficient. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
Currently each tenant/benchmark independently controls its own concurrency and max operation count (via per-tenant
Flux/ExecutorService). As more tenants are added, maintaining concurrency config across all of them becomes painful and error-prone.Solution
Centralize dispatch at the orchestrator level: a single
Flux.generate().flatMap(benchmark.next(), globalConcurrency)drives all operations, randomly selecting which tenant executes each slot.Architecture
Changes
Core Design
Mono<Void> next()(execute one operation) +shutdown(). Removedrun()andsupportsOrchestratorDispatch().runWorkload()uses a single Flux withThreadLocalRandomtenant selection anddoFinallyfor operation counting. RemovedExecutorService.concurrency,numberOfOperations, andmaxRunningTimeDurationare now required top-level fields in the workload config JSON. Validatesconcurrency > 0.Benchmark Adaptations
next()delegates toperformWorkload()viaAtomicLongcounter. Removed per-benchmarkScheduler.next()wraps blockingperformWorkload()inMono.fromCallable().subscribeOn(Schedulers.boundedElastic()). RemovedExecutorService,Semaphore,ResultHandler.next()wraps full lifecycle inMono.fromRunnable().Config Format
{ "concurrency": 100, "numberOfOperations": 500000, "tenantDefaults": { ... }, "tenants": [ ... ] }Testing
mvn compile test-compile -pl . -am -DskipTestspasses cleanWorkflowTestandReadMyWritesConsistencyTestfor new constructor signatures