Skip to content

Workload-ID feature#48128

Open
dibahlfi wants to merge 4 commits intomainfrom
users/dibahl/workload-id-feature
Open

Workload-ID feature#48128
dibahlfi wants to merge 4 commits intomainfrom
users/dibahl/workload-id-feature

Conversation

@dibahlfi
Copy link
Member

The Workload ID feature allows customers to tag their Cosmos DB requests with a numeric identifier (1-50) that flows through to Azure Monitor metrics. This enables customers to identify and monitor the source of their database requests without needing to enable expensive diagnostic logs.

Copilot AI review requested due to automatic review settings February 26, 2026 01:00
@dibahlfi dibahlfi requested review from a team and kirankumarkolli as code owners February 26, 2026 01:00
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request implements the Workload-ID feature for Azure Cosmos DB Java SDK, enabling customers to tag their database requests with a numeric identifier (1-50) that flows through to Azure Monitor metrics. This provides a lightweight alternative to diagnostic logs for monitoring the source of database requests.

Changes:

  • Added customHeaders() method to CosmosClientBuilder for setting client-level custom headers
  • Made setHeader() methods public on all request options classes (CosmosItemRequestOptions, CosmosBatchRequestOptions, CosmosBulkExecutionOptions, CosmosChangeFeedRequestOptions, CosmosQueryRequestOptions) for per-request header overrides
  • Implemented workload-id header support in the RNTBD (Direct mode) protocol layer with proper error handling
  • Integrated custom headers feature into Spark connector with JSON-based configuration
  • Added comprehensive unit tests and E2E integration tests

Reviewed changes

Copilot reviewed 23 out of 23 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
CosmosClientBuilder.java Added customHeaders() method for client-level custom headers configuration
CosmosItemRequestOptions.java Made setHeader() public with updated documentation
CosmosQueryRequestOptions.java Added new public setHeader() method that delegates to internal implementation
CosmosChangeFeedRequestOptions.java Made setHeader() public with updated documentation
CosmosBulkExecutionOptions.java Made setHeader() public with updated documentation
CosmosBatchRequestOptions.java Made setHeader() public with updated documentation
RxDocumentClientImpl.java Added constructor overload accepting customHeaders and merged them into request headers
AsyncDocumentClient.java Added Builder support for customHeaders parameter
CosmosAsyncClient.java Passed customHeaders from builder to AsyncDocumentClient
RntbdRequestHeaders.java Added addWorkloadId() method with error handling for parsing and byte conversion
RntbdConstants.java Added WorkloadId enum entry with ID 0x00DC as a Byte type token
HttpConstants.java Added WORKLOAD_ID header constant definition
WorkloadIdE2ETests.java Comprehensive E2E tests covering CRUD, query, and regression scenarios
RntbdWorkloadIdTests.java Unit tests verifying RNTBD header definition and valid/invalid value handling
CustomHeadersTests.java Unit tests for CosmosClientBuilder and request options public API surface
CosmosConfig.scala Added CustomHeaders config entry with JSON parsing for Spark connector
CosmosClientConfiguration.scala Added customHeaders field to configuration case class
CosmosClientCache.scala Applied customHeaders to CosmosClientBuilder and included in cache key
SparkE2EWorkloadIdITest.scala E2E tests for Spark connector read/write with custom headers
CosmosClientConfigurationSpec.scala Unit tests for JSON parsing of customHeaders configuration
Multiple test files Updated test fixtures to include customHeaders: None for backward compatibility
Comments suppressed due to low confidence (1)

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/WorkloadIdE2ETests.java:327

  • Test coverage is missing for batch and bulk operations with workload-id headers. Consider adding E2E tests similar to the existing CRUD tests that verify:
  1. CosmosBatchRequestOptions.setHeader() works correctly with batch operations
  2. CosmosBulkExecutionOptions.setHeader() works correctly with bulk operations

This would ensure the workload-id feature works end-to-end for all operation types, not just point operations and queries.

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.rx;

import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosAsyncDatabase;
import com.azure.cosmos.CosmosClientBuilder;
import com.azure.cosmos.TestObject;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.TestConfigurations;
import com.azure.cosmos.models.CosmosBulkExecutionOptions;
import com.azure.cosmos.models.CosmosBulkOperations;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosItemRequestOptions;
import com.azure.cosmos.models.CosmosItemResponse;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.PartitionKeyDefinition;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;

/**
 * End-to-end integration tests for the custom headers / workload-id feature.
 * <p>
 * Test type: EMULATOR INTEGRATION TEST — requires the Cosmos DB Emulator to be running locally.
 */
public class WorkloadIdE2ETests extends TestSuiteBase {

    private static final String DATABASE_ID = "workloadIdTestDb-" + UUID.randomUUID();
    private static final String CONTAINER_ID = "workloadIdTestContainer-" + UUID.randomUUID();

    private CosmosAsyncClient clientWithWorkloadId;
    private CosmosAsyncDatabase database;
    private CosmosAsyncContainer container;

    public WorkloadIdE2ETests() {
        super(new CosmosClientBuilder()
            .endpoint(TestConfigurations.HOST)
            .key(TestConfigurations.MASTER_KEY));
    }

    @BeforeClass(groups = { "emulator" }, timeOut = SETUP_TIMEOUT)
    public void beforeClass() {
        Map<String, String> headers = new HashMap<>();
        headers.put(HttpConstants.HttpHeaders.WORKLOAD_ID, "15");

        clientWithWorkloadId = new CosmosClientBuilder()
            .endpoint(TestConfigurations.HOST)
            .key(TestConfigurations.MASTER_KEY)
            .customHeaders(headers)
            .buildAsyncClient();

        database = createDatabase(clientWithWorkloadId, DATABASE_ID);

        PartitionKeyDefinition partitionKeyDef = new PartitionKeyDefinition();
        ArrayList<String> paths = new ArrayList<>();
        paths.add("/mypk");
        partitionKeyDef.setPaths(paths);
        CosmosContainerProperties containerProperties = new CosmosContainerProperties(CONTAINER_ID, partitionKeyDef);
        database.createContainer(containerProperties).block();
        container = database.getContainer(CONTAINER_ID);
    }

    /**
     * verifies that a create (POST) operation succeeds when the client
     * has a workload-id custom header set at the builder level. Confirms the header
     * flows through the request pipeline without causing errors.
     */
    @Test(groups = { "emulator" }, timeOut = TIMEOUT)
    public void createItemWithClientLevelWorkloadId() {
        TestObject doc = TestObject.create();

        CosmosItemResponse<TestObject> response = container
            .createItem(doc, new PartitionKey(doc.getMypk()), new CosmosItemRequestOptions())
            .block();

        assertThat(response).isNotNull();
        assertThat(response.getStatusCode()).isEqualTo(201);
    }

    /**
     * Verifies that a read (GET) operation succeeds with the client-level workload-id
     * header and that the correct document is returned. Ensures the header does not
     * interfere with normal read semantics.
     */
    @Test(groups = { "emulator" }, timeOut = TIMEOUT)
    public void readItemWithClientLevelWorkloadId() {
        // Verify read operation succeeds with workload-id header
        TestObject doc = TestObject.create();
        container.createItem(doc, new PartitionKey(doc.getMypk()), new CosmosItemRequestOptions()).block();

        CosmosItemResponse<TestObject> response = container
            .readItem(doc.getId(), new PartitionKey(doc.getMypk()), TestObject.class)
            .block();

        assertThat(response).isNotNull();
        assertThat(response.getStatusCode()).isEqualTo(200);
        assertThat(response.getItem().getId()).isEqualTo(doc.getId());
    }

    /**
     * Verifies that a replace (PUT) operation succeeds with the client-level workload-id
     * header. Confirms the header propagates correctly for update operations.
     */
    @Test(groups = { "emulator" }, timeOut = TIMEOUT)
    public void replaceItemWithClientLevelWorkloadId() {
        // Verify replace operation succeeds with workload-id header
        TestObject doc = TestObject.create();
        container.createItem(doc, new PartitionKey(doc.getMypk()), new CosmosItemRequestOptions()).block();

        doc.setStringProp("updated-" + UUID.randomUUID());
        CosmosItemResponse<TestObject> response = container
            .replaceItem(doc, doc.getId(), new PartitionKey(doc.getMypk()), new CosmosItemRequestOptions())
            .block();

        assertThat(response).isNotNull();
        assertThat(response.getStatusCode()).isEqualTo(200);
    }

    /**
     * Verifies that a delete operation succeeds with the client-level workload-id header
     * and returns the expected 204 No Content status code.
     */
    @Test(groups = { "emulator" }, timeOut = TIMEOUT)
    public void deleteItemWithClientLevelWorkloadId() {
        // Verify delete operation succeeds with workload-id header
        TestObject doc = TestObject.create();
        container.createItem(doc, new PartitionKey(doc.getMypk()), new CosmosItemRequestOptions()).block();

        CosmosItemResponse<Object> response = container
            .deleteItem(doc.getId(), new PartitionKey(doc.getMypk()), new CosmosItemRequestOptions())
            .block();

        assertThat(response).isNotNull();
        assertThat(response.getStatusCode()).isEqualTo(204);
    }

    /**
     * Verifies that a per-request workload-id header override via
     * {@code CosmosItemRequestOptions.setHeader()} works. The request-level header
     * (value "30") should take precedence over the client-level default (value "15").
     */
    @Test(groups = { "emulator" }, timeOut = TIMEOUT)
    public void createItemWithRequestLevelWorkloadIdOverride() {
        // Verify per-request header override works — request-level should take precedence
        TestObject doc = TestObject.create();

        CosmosItemRequestOptions options = new CosmosItemRequestOptions()
            .setHeader(HttpConstants.HttpHeaders.WORKLOAD_ID, "30");

        CosmosItemResponse<TestObject> response = container
            .createItem(doc, new PartitionKey(doc.getMypk()), options)
            .block();

        assertThat(response).isNotNull();
        assertThat(response.getStatusCode()).isEqualTo(201);
    }

    /**
     * Verifies that a cross-partition query operation succeeds when the client has a
     * workload-id custom header. Confirms the header flows correctly through the
     * query pipeline and does not affect result correctness.
     */
    @Test(groups = { "emulator" }, timeOut = TIMEOUT)
    public void queryItemsWithClientLevelWorkloadId() {
        // Verify query operation succeeds with workload-id header
        TestObject doc = TestObject.create();
        container.createItem(doc, new PartitionKey(doc.getMypk()), new CosmosItemRequestOptions()).block();

        CosmosQueryRequestOptions queryOptions = new CosmosQueryRequestOptions();
        long count = container
            .queryItems("SELECT * FROM c WHERE c.id = '" + doc.getId() + "'", queryOptions, TestObject.class)
            .collectList()
            .block()
            .size();

        assertThat(count).isGreaterThanOrEqualTo(1);
    }

    /**
     * Verifies that a per-request workload-id header override on
     * {@code CosmosQueryRequestOptions.setHeader()} works for query operations.
     * The request-level header (value "42") should take precedence over the
     * client-level default.
     */
    @Test(groups = { "emulator" }, timeOut = TIMEOUT)
    public void queryItemsWithRequestLevelWorkloadIdOverride() {
        // Verify per-request header override on query options works
        TestObject doc = TestObject.create();
        container.createItem(doc, new PartitionKey(doc.getMypk()), new CosmosItemRequestOptions()).block();

        CosmosQueryRequestOptions queryOptions = new CosmosQueryRequestOptions()
            .setHeader(HttpConstants.HttpHeaders.WORKLOAD_ID, "42");

        long count = container
            .queryItems("SELECT * FROM c WHERE c.id = '" + doc.getId() + "'", queryOptions, TestObject.class)
            .collectList()
            .block()
            .size();

        assertThat(count).isGreaterThanOrEqualTo(1);
    }

    /**
     * Regression test: verifies that a client created without any custom headers
     * continues to work normally. Ensures the custom headers feature does not
     * introduce regressions for clients that do not use it.
     */
    @Test(groups = { "emulator" }, timeOut = TIMEOUT)
    public void clientWithNoCustomHeadersStillWorks() {
        // Verify that a client without custom headers works normally (no regression)
        CosmosAsyncClient clientWithoutHeaders = new CosmosClientBuilder()
            .endpoint(TestConfigurations.HOST)
            .key(TestConfigurations.MASTER_KEY)
            .buildAsyncClient();

        try {
            CosmosAsyncContainer c = clientWithoutHeaders
                .getDatabase(DATABASE_ID)
                .getContainer(CONTAINER_ID);

            TestObject doc = TestObject.create();
            CosmosItemResponse<TestObject> response = c
                .createItem(doc, new PartitionKey(doc.getMypk()), new CosmosItemRequestOptions())
                .block();

            assertThat(response).isNotNull();
            assertThat(response.getStatusCode()).isEqualTo(201);
        } finally {
            safeClose(clientWithoutHeaders);
        }
    }

    /**
     * Verifies that a client created with an empty custom headers map works normally.
     * An empty map should behave identically to no custom headers — no errors,
     * no unexpected behavior.
     */
    @Test(groups = { "emulator" }, timeOut = TIMEOUT)
    public void clientWithEmptyCustomHeaders() {
        // Verify that a client with empty custom headers map works normally
        CosmosAsyncClient clientWithEmptyHeaders = new CosmosClientBuilder()
            .endpoint(TestConfigurations.HOST)
            .key(TestConfigurations.MASTER_KEY)
            .customHeaders(new HashMap<>())
            .buildAsyncClient();

        try {
            CosmosAsyncContainer c = clientWithEmptyHeaders
                .getDatabase(DATABASE_ID)
                .getContainer(CONTAINER_ID);

            TestObject doc = TestObject.create();
            CosmosItemResponse<TestObject> response = c
                .createItem(doc, new PartitionKey(doc.getMypk()), new CosmosItemRequestOptions())
                .block();

            assertThat(response).isNotNull();
            assertThat(response.getStatusCode()).isEqualTo(201);
        } finally {
            safeClose(clientWithEmptyHeaders);
        }
    }

    /**
     * Verifies that a client can be configured with multiple custom headers simultaneously
     * (workload-id plus an additional custom header). Confirms that all headers flow
     * through the pipeline without interfering with each other.
     */
    @Test(groups = { "emulator" }, timeOut = TIMEOUT)
    public void clientWithMultipleCustomHeaders() {
        // Verify that multiple custom headers can be set simultaneously
        Map<String, String> headers = new HashMap<>();
        headers.put(HttpConstants.HttpHeaders.WORKLOAD_ID, "20");
        headers.put("x-ms-custom-test-header", "test-value");

        CosmosAsyncClient clientWithMultipleHeaders = new CosmosClientBuilder()
            .endpoint(TestConfigurations.HOST)
            .key(TestConfigurations.MASTER_KEY)
            .customHeaders(headers)
            .buildAsyncClient();

        try {
            CosmosAsyncContainer c = clientWithMultipleHeaders
                .getDatabase(DATABASE_ID)
                .getContainer(CONTAINER_ID);

            TestObject doc = TestObject.create();
            CosmosItemResponse<TestObject> response = c
                .createItem(doc, new PartitionKey(doc.getMypk()), new CosmosItemRequestOptions())
                .block();

            assertThat(response).isNotNull();
            assertThat(response.getStatusCode()).isEqualTo(201);
        } finally {
            safeClose(clientWithMultipleHeaders);
        }
    }

    @AfterClass(groups = { "emulator" }, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true)
    public void afterClass() {
        safeDeleteDatabase(database);
        safeClose(clientWithWorkloadId);
    }
}


@github-actions
Copy link
Contributor

API Change Check

APIView identified API level changes in this PR and created the following API reviews

com.azure:azure-cosmos

* <p>
* If the same header is also set on request options (e.g.,
* {@code CosmosItemRequestOptions.setHeader(String, String)}),
* the request-level value takes precedence over the client-level value.
Copy link
Member

@jeet1995 jeet1995 Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also call out that customHeaders take lower precedence (in javadoc) than headers set through targeted APIs (see: setConsistencyLevel, setReadConsistencyStrategy etc.) and also validate this.

Copy link
Member

@xinlian12 xinlian12 Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a good question, what we want to do when the same header can be set from multiple different places, which one should take priority

I am wondering whether we should just start with only allow work load id header -> for direct mode, if SDK does not know the header, we are just going to drop the header anyway. And allow customer setting important headers through customHeader I feel can cause a lot confusion issues in future

* @return the CosmosBatchRequestOptions.
*/
CosmosBatchRequestOptions setHeader(String name, String value) {
public CosmosBatchRequestOptions setHeader(String name, String value) {
Copy link
Member

@jeet1995 jeet1995 Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add such setter for CosmosReadManyRequestOptions too.

key = CosmosConfigNames.CustomHeaders,
mandatory = false,
parseFromStringFunction = headersJson => {
val mapper = new com.fasterxml.jackson.databind.ObjectMapper()
Copy link
Member

@jeet1995 jeet1995 Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reusable instance of ObjectMapper and how are we handling format issues?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can just use Utils.simpleObjectMapperAllowingDuplicatedProperties

private CosmosAsyncDatabase database;
private CosmosAsyncContainer container;

public WorkloadIdE2ETests() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need client builder factory here? for example @factory(dataProvider = "simpleClientBuilderGatewaySession")

final int workloadId = Integer.parseInt(value);
this.getWorkloadId().setValue((byte) workloadId);
} catch (NumberFormatException e) {
logger.warn("Invalid value for workload id header: {}", value, e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we fail here rather than just silently suppress?

@@ -1884,6 +1940,11 @@ public void validateAndLogNonDefaultReadConsistencyStrategy(String readConsisten
private Map<String, String> getRequestHeaders(RequestOptions options, ResourceType resourceType, OperationType operationType) {
Map<String, String> headers = new HashMap<>();

// Apply client-level custom headers first (e.g., workload-id from CosmosClientBuilder.customHeaders())
if (this.customHeaders != null && !this.customHeaders.isEmpty()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QQ: does this header need to be sent to all type requests from SDK? including metadata requests(for example address refresh)? if this the case, then this header is not wired up everywhere, for example in GatewayAddressCache

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants