Conversation
There was a problem hiding this comment.
Pull request overview
This pull request implements the Workload-ID feature for Azure Cosmos DB Java SDK, enabling customers to tag their database requests with a numeric identifier (1-50) that flows through to Azure Monitor metrics. This provides a lightweight alternative to diagnostic logs for monitoring the source of database requests.
Changes:
- Added
customHeaders()method to CosmosClientBuilder for setting client-level custom headers - Made
setHeader()methods public on all request options classes (CosmosItemRequestOptions, CosmosBatchRequestOptions, CosmosBulkExecutionOptions, CosmosChangeFeedRequestOptions, CosmosQueryRequestOptions) for per-request header overrides - Implemented workload-id header support in the RNTBD (Direct mode) protocol layer with proper error handling
- Integrated custom headers feature into Spark connector with JSON-based configuration
- Added comprehensive unit tests and E2E integration tests
Reviewed changes
Copilot reviewed 23 out of 23 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| CosmosClientBuilder.java | Added customHeaders() method for client-level custom headers configuration |
| CosmosItemRequestOptions.java | Made setHeader() public with updated documentation |
| CosmosQueryRequestOptions.java | Added new public setHeader() method that delegates to internal implementation |
| CosmosChangeFeedRequestOptions.java | Made setHeader() public with updated documentation |
| CosmosBulkExecutionOptions.java | Made setHeader() public with updated documentation |
| CosmosBatchRequestOptions.java | Made setHeader() public with updated documentation |
| RxDocumentClientImpl.java | Added constructor overload accepting customHeaders and merged them into request headers |
| AsyncDocumentClient.java | Added Builder support for customHeaders parameter |
| CosmosAsyncClient.java | Passed customHeaders from builder to AsyncDocumentClient |
| RntbdRequestHeaders.java | Added addWorkloadId() method with error handling for parsing and byte conversion |
| RntbdConstants.java | Added WorkloadId enum entry with ID 0x00DC as a Byte type token |
| HttpConstants.java | Added WORKLOAD_ID header constant definition |
| WorkloadIdE2ETests.java | Comprehensive E2E tests covering CRUD, query, and regression scenarios |
| RntbdWorkloadIdTests.java | Unit tests verifying RNTBD header definition and valid/invalid value handling |
| CustomHeadersTests.java | Unit tests for CosmosClientBuilder and request options public API surface |
| CosmosConfig.scala | Added CustomHeaders config entry with JSON parsing for Spark connector |
| CosmosClientConfiguration.scala | Added customHeaders field to configuration case class |
| CosmosClientCache.scala | Applied customHeaders to CosmosClientBuilder and included in cache key |
| SparkE2EWorkloadIdITest.scala | E2E tests for Spark connector read/write with custom headers |
| CosmosClientConfigurationSpec.scala | Unit tests for JSON parsing of customHeaders configuration |
| Multiple test files | Updated test fixtures to include customHeaders: None for backward compatibility |
Comments suppressed due to low confidence (1)
sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/WorkloadIdE2ETests.java:327
- Test coverage is missing for batch and bulk operations with workload-id headers. Consider adding E2E tests similar to the existing CRUD tests that verify:
- CosmosBatchRequestOptions.setHeader() works correctly with batch operations
- CosmosBulkExecutionOptions.setHeader() works correctly with bulk operations
This would ensure the workload-id feature works end-to-end for all operation types, not just point operations and queries.
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.rx;
import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosAsyncDatabase;
import com.azure.cosmos.CosmosClientBuilder;
import com.azure.cosmos.TestObject;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.TestConfigurations;
import com.azure.cosmos.models.CosmosBulkExecutionOptions;
import com.azure.cosmos.models.CosmosBulkOperations;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosItemRequestOptions;
import com.azure.cosmos.models.CosmosItemResponse;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.PartitionKeyDefinition;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
/**
* End-to-end integration tests for the custom headers / workload-id feature.
* <p>
* Test type: EMULATOR INTEGRATION TEST — requires the Cosmos DB Emulator to be running locally.
*/
public class WorkloadIdE2ETests extends TestSuiteBase {
private static final String DATABASE_ID = "workloadIdTestDb-" + UUID.randomUUID();
private static final String CONTAINER_ID = "workloadIdTestContainer-" + UUID.randomUUID();
private CosmosAsyncClient clientWithWorkloadId;
private CosmosAsyncDatabase database;
private CosmosAsyncContainer container;
public WorkloadIdE2ETests() {
super(new CosmosClientBuilder()
.endpoint(TestConfigurations.HOST)
.key(TestConfigurations.MASTER_KEY));
}
@BeforeClass(groups = { "emulator" }, timeOut = SETUP_TIMEOUT)
public void beforeClass() {
Map<String, String> headers = new HashMap<>();
headers.put(HttpConstants.HttpHeaders.WORKLOAD_ID, "15");
clientWithWorkloadId = new CosmosClientBuilder()
.endpoint(TestConfigurations.HOST)
.key(TestConfigurations.MASTER_KEY)
.customHeaders(headers)
.buildAsyncClient();
database = createDatabase(clientWithWorkloadId, DATABASE_ID);
PartitionKeyDefinition partitionKeyDef = new PartitionKeyDefinition();
ArrayList<String> paths = new ArrayList<>();
paths.add("/mypk");
partitionKeyDef.setPaths(paths);
CosmosContainerProperties containerProperties = new CosmosContainerProperties(CONTAINER_ID, partitionKeyDef);
database.createContainer(containerProperties).block();
container = database.getContainer(CONTAINER_ID);
}
/**
* verifies that a create (POST) operation succeeds when the client
* has a workload-id custom header set at the builder level. Confirms the header
* flows through the request pipeline without causing errors.
*/
@Test(groups = { "emulator" }, timeOut = TIMEOUT)
public void createItemWithClientLevelWorkloadId() {
TestObject doc = TestObject.create();
CosmosItemResponse<TestObject> response = container
.createItem(doc, new PartitionKey(doc.getMypk()), new CosmosItemRequestOptions())
.block();
assertThat(response).isNotNull();
assertThat(response.getStatusCode()).isEqualTo(201);
}
/**
* Verifies that a read (GET) operation succeeds with the client-level workload-id
* header and that the correct document is returned. Ensures the header does not
* interfere with normal read semantics.
*/
@Test(groups = { "emulator" }, timeOut = TIMEOUT)
public void readItemWithClientLevelWorkloadId() {
// Verify read operation succeeds with workload-id header
TestObject doc = TestObject.create();
container.createItem(doc, new PartitionKey(doc.getMypk()), new CosmosItemRequestOptions()).block();
CosmosItemResponse<TestObject> response = container
.readItem(doc.getId(), new PartitionKey(doc.getMypk()), TestObject.class)
.block();
assertThat(response).isNotNull();
assertThat(response.getStatusCode()).isEqualTo(200);
assertThat(response.getItem().getId()).isEqualTo(doc.getId());
}
/**
* Verifies that a replace (PUT) operation succeeds with the client-level workload-id
* header. Confirms the header propagates correctly for update operations.
*/
@Test(groups = { "emulator" }, timeOut = TIMEOUT)
public void replaceItemWithClientLevelWorkloadId() {
// Verify replace operation succeeds with workload-id header
TestObject doc = TestObject.create();
container.createItem(doc, new PartitionKey(doc.getMypk()), new CosmosItemRequestOptions()).block();
doc.setStringProp("updated-" + UUID.randomUUID());
CosmosItemResponse<TestObject> response = container
.replaceItem(doc, doc.getId(), new PartitionKey(doc.getMypk()), new CosmosItemRequestOptions())
.block();
assertThat(response).isNotNull();
assertThat(response.getStatusCode()).isEqualTo(200);
}
/**
* Verifies that a delete operation succeeds with the client-level workload-id header
* and returns the expected 204 No Content status code.
*/
@Test(groups = { "emulator" }, timeOut = TIMEOUT)
public void deleteItemWithClientLevelWorkloadId() {
// Verify delete operation succeeds with workload-id header
TestObject doc = TestObject.create();
container.createItem(doc, new PartitionKey(doc.getMypk()), new CosmosItemRequestOptions()).block();
CosmosItemResponse<Object> response = container
.deleteItem(doc.getId(), new PartitionKey(doc.getMypk()), new CosmosItemRequestOptions())
.block();
assertThat(response).isNotNull();
assertThat(response.getStatusCode()).isEqualTo(204);
}
/**
* Verifies that a per-request workload-id header override via
* {@code CosmosItemRequestOptions.setHeader()} works. The request-level header
* (value "30") should take precedence over the client-level default (value "15").
*/
@Test(groups = { "emulator" }, timeOut = TIMEOUT)
public void createItemWithRequestLevelWorkloadIdOverride() {
// Verify per-request header override works — request-level should take precedence
TestObject doc = TestObject.create();
CosmosItemRequestOptions options = new CosmosItemRequestOptions()
.setHeader(HttpConstants.HttpHeaders.WORKLOAD_ID, "30");
CosmosItemResponse<TestObject> response = container
.createItem(doc, new PartitionKey(doc.getMypk()), options)
.block();
assertThat(response).isNotNull();
assertThat(response.getStatusCode()).isEqualTo(201);
}
/**
* Verifies that a cross-partition query operation succeeds when the client has a
* workload-id custom header. Confirms the header flows correctly through the
* query pipeline and does not affect result correctness.
*/
@Test(groups = { "emulator" }, timeOut = TIMEOUT)
public void queryItemsWithClientLevelWorkloadId() {
// Verify query operation succeeds with workload-id header
TestObject doc = TestObject.create();
container.createItem(doc, new PartitionKey(doc.getMypk()), new CosmosItemRequestOptions()).block();
CosmosQueryRequestOptions queryOptions = new CosmosQueryRequestOptions();
long count = container
.queryItems("SELECT * FROM c WHERE c.id = '" + doc.getId() + "'", queryOptions, TestObject.class)
.collectList()
.block()
.size();
assertThat(count).isGreaterThanOrEqualTo(1);
}
/**
* Verifies that a per-request workload-id header override on
* {@code CosmosQueryRequestOptions.setHeader()} works for query operations.
* The request-level header (value "42") should take precedence over the
* client-level default.
*/
@Test(groups = { "emulator" }, timeOut = TIMEOUT)
public void queryItemsWithRequestLevelWorkloadIdOverride() {
// Verify per-request header override on query options works
TestObject doc = TestObject.create();
container.createItem(doc, new PartitionKey(doc.getMypk()), new CosmosItemRequestOptions()).block();
CosmosQueryRequestOptions queryOptions = new CosmosQueryRequestOptions()
.setHeader(HttpConstants.HttpHeaders.WORKLOAD_ID, "42");
long count = container
.queryItems("SELECT * FROM c WHERE c.id = '" + doc.getId() + "'", queryOptions, TestObject.class)
.collectList()
.block()
.size();
assertThat(count).isGreaterThanOrEqualTo(1);
}
/**
* Regression test: verifies that a client created without any custom headers
* continues to work normally. Ensures the custom headers feature does not
* introduce regressions for clients that do not use it.
*/
@Test(groups = { "emulator" }, timeOut = TIMEOUT)
public void clientWithNoCustomHeadersStillWorks() {
// Verify that a client without custom headers works normally (no regression)
CosmosAsyncClient clientWithoutHeaders = new CosmosClientBuilder()
.endpoint(TestConfigurations.HOST)
.key(TestConfigurations.MASTER_KEY)
.buildAsyncClient();
try {
CosmosAsyncContainer c = clientWithoutHeaders
.getDatabase(DATABASE_ID)
.getContainer(CONTAINER_ID);
TestObject doc = TestObject.create();
CosmosItemResponse<TestObject> response = c
.createItem(doc, new PartitionKey(doc.getMypk()), new CosmosItemRequestOptions())
.block();
assertThat(response).isNotNull();
assertThat(response.getStatusCode()).isEqualTo(201);
} finally {
safeClose(clientWithoutHeaders);
}
}
/**
* Verifies that a client created with an empty custom headers map works normally.
* An empty map should behave identically to no custom headers — no errors,
* no unexpected behavior.
*/
@Test(groups = { "emulator" }, timeOut = TIMEOUT)
public void clientWithEmptyCustomHeaders() {
// Verify that a client with empty custom headers map works normally
CosmosAsyncClient clientWithEmptyHeaders = new CosmosClientBuilder()
.endpoint(TestConfigurations.HOST)
.key(TestConfigurations.MASTER_KEY)
.customHeaders(new HashMap<>())
.buildAsyncClient();
try {
CosmosAsyncContainer c = clientWithEmptyHeaders
.getDatabase(DATABASE_ID)
.getContainer(CONTAINER_ID);
TestObject doc = TestObject.create();
CosmosItemResponse<TestObject> response = c
.createItem(doc, new PartitionKey(doc.getMypk()), new CosmosItemRequestOptions())
.block();
assertThat(response).isNotNull();
assertThat(response.getStatusCode()).isEqualTo(201);
} finally {
safeClose(clientWithEmptyHeaders);
}
}
/**
* Verifies that a client can be configured with multiple custom headers simultaneously
* (workload-id plus an additional custom header). Confirms that all headers flow
* through the pipeline without interfering with each other.
*/
@Test(groups = { "emulator" }, timeOut = TIMEOUT)
public void clientWithMultipleCustomHeaders() {
// Verify that multiple custom headers can be set simultaneously
Map<String, String> headers = new HashMap<>();
headers.put(HttpConstants.HttpHeaders.WORKLOAD_ID, "20");
headers.put("x-ms-custom-test-header", "test-value");
CosmosAsyncClient clientWithMultipleHeaders = new CosmosClientBuilder()
.endpoint(TestConfigurations.HOST)
.key(TestConfigurations.MASTER_KEY)
.customHeaders(headers)
.buildAsyncClient();
try {
CosmosAsyncContainer c = clientWithMultipleHeaders
.getDatabase(DATABASE_ID)
.getContainer(CONTAINER_ID);
TestObject doc = TestObject.create();
CosmosItemResponse<TestObject> response = c
.createItem(doc, new PartitionKey(doc.getMypk()), new CosmosItemRequestOptions())
.block();
assertThat(response).isNotNull();
assertThat(response.getStatusCode()).isEqualTo(201);
} finally {
safeClose(clientWithMultipleHeaders);
}
}
@AfterClass(groups = { "emulator" }, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true)
public void afterClass() {
safeDeleteDatabase(database);
safeClose(clientWithWorkloadId);
}
}
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosClientBuilder.java
Show resolved
Hide resolved
...s/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdConstants.java
Outdated
Show resolved
Hide resolved
.../main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestHeaders.java
Outdated
Show resolved
Hide resolved
API Change CheckAPIView identified API level changes in this PR and created the following API reviews |
| * <p> | ||
| * If the same header is also set on request options (e.g., | ||
| * {@code CosmosItemRequestOptions.setHeader(String, String)}), | ||
| * the request-level value takes precedence over the client-level value. |
There was a problem hiding this comment.
We should also call out that customHeaders take lower precedence (in javadoc) than headers set through targeted APIs (see: setConsistencyLevel, setReadConsistencyStrategy etc.) and also validate this.
There was a problem hiding this comment.
I think this is a good question, what we want to do when the same header can be set from multiple different places, which one should take priority
I am wondering whether we should just start with only allow work load id header -> for direct mode, if SDK does not know the header, we are just going to drop the header anyway. And allow customer setting important headers through customHeader I feel can cause a lot confusion issues in future
| * @return the CosmosBatchRequestOptions. | ||
| */ | ||
| CosmosBatchRequestOptions setHeader(String name, String value) { | ||
| public CosmosBatchRequestOptions setHeader(String name, String value) { |
There was a problem hiding this comment.
We can add such setter for CosmosReadManyRequestOptions too.
| key = CosmosConfigNames.CustomHeaders, | ||
| mandatory = false, | ||
| parseFromStringFunction = headersJson => { | ||
| val mapper = new com.fasterxml.jackson.databind.ObjectMapper() |
There was a problem hiding this comment.
Is there a reusable instance of ObjectMapper and how are we handling format issues?
There was a problem hiding this comment.
maybe we can just use Utils.simpleObjectMapperAllowingDuplicatedProperties
| private CosmosAsyncDatabase database; | ||
| private CosmosAsyncContainer container; | ||
|
|
||
| public WorkloadIdE2ETests() { |
There was a problem hiding this comment.
do we need client builder factory here? for example @factory(dataProvider = "simpleClientBuilderGatewaySession")
| final int workloadId = Integer.parseInt(value); | ||
| this.getWorkloadId().setValue((byte) workloadId); | ||
| } catch (NumberFormatException e) { | ||
| logger.warn("Invalid value for workload id header: {}", value, e); |
There was a problem hiding this comment.
should we fail here rather than just silently suppress?
| @@ -1884,6 +1940,11 @@ public void validateAndLogNonDefaultReadConsistencyStrategy(String readConsisten | |||
| private Map<String, String> getRequestHeaders(RequestOptions options, ResourceType resourceType, OperationType operationType) { | |||
| Map<String, String> headers = new HashMap<>(); | |||
|
|
|||
| // Apply client-level custom headers first (e.g., workload-id from CosmosClientBuilder.customHeaders()) | |||
| if (this.customHeaders != null && !this.customHeaders.isEmpty()) { | |||
There was a problem hiding this comment.
QQ: does this header need to be sent to all type requests from SDK? including metadata requests(for example address refresh)? if this the case, then this header is not wired up everywhere, for example in GatewayAddressCache
The Workload ID feature allows customers to tag their Cosmos DB requests with a numeric identifier (1-50) that flows through to Azure Monitor metrics. This enables customers to identify and monitor the source of their database requests without needing to enable expensive diagnostic logs.