From d68161445099653eec6547f6014c32187637acde Mon Sep 17 00:00:00 2001 From: Copilot <223556219+Copilot@users.noreply.github.com> Date: Mon, 15 Jun 2026 13:37:58 -0700 Subject: [PATCH 1/2] Add cold-start metadata cache cross-region hedging (port of dotnet #5923) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- sdk/cosmos/azure-cosmos/CHANGELOG.md | 1 + .../cosmos/_availability_strategy_config.py | 50 ++- .../azure/cosmos/_cosmos_client_connection.py | 8 + .../azure/cosmos/_metadata_hedging.py | 325 ++++++++++++++++++ .../azure/cosmos/_synchronized_request.py | 46 ++- .../azure/cosmos/aio/_asynchronous_request.py | 47 ++- .../azure/cosmos/aio/_cosmos_client.py | 10 + .../aio/_cosmos_client_connection_async.py | 8 + .../azure/cosmos/aio/_metadata_hedging.py | 245 +++++++++++++ .../azure/cosmos/cosmos_client.py | 9 + .../tests/test_metadata_hedging.py | 262 ++++++++++++++ .../tests/test_metadata_hedging_async.py | 180 ++++++++++ 12 files changed, 1187 insertions(+), 4 deletions(-) create mode 100644 sdk/cosmos/azure-cosmos/azure/cosmos/_metadata_hedging.py create mode 100644 sdk/cosmos/azure-cosmos/azure/cosmos/aio/_metadata_hedging.py create mode 100644 sdk/cosmos/azure-cosmos/tests/test_metadata_hedging.py create mode 100644 sdk/cosmos/azure-cosmos/tests/test_metadata_hedging_async.py diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index b8b1c451ad10..5e4210a00df7 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -3,6 +3,7 @@ ### 4.16.2 (Unreleased) #### Features Added +* Added cold-start metadata cache cross-region hedging. When enabled, the SDK hedges the first-time population of the container and partition-key-range metadata caches to a second region if the primary region is slow, returning the first acceptable response. Controlled by the new `enable_metadata_hedging_for_cold_start` client keyword (tri-state: `None` follows the account's PPAF state, `True` forces it on, `False` disables it). #### Breaking Changes diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_availability_strategy_config.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_availability_strategy_config.py index d2b9ed1e2283..2050662ebb69 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_availability_strategy_config.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_availability_strategy_config.py @@ -27,6 +27,14 @@ DEFAULT_THRESHOLD_MS = 500 DEFAULT_THRESHOLD_STEPS_MS = 100 +# Defaults for cold-start metadata cache hedging. These are SDK-derived and not +# customer-configurable. The threshold mirrors the .NET design (first-attempt +# control-plane timeout ~1s + a 500ms step => 1500ms). The concurrency budget +# caps the number of in-flight metadata hedges per client. +DEFAULT_METADATA_HEDGING_THRESHOLD_MS = 1500 +DEFAULT_METADATA_HEDGING_THRESHOLD_STEPS_MS = 500 +DEFAULT_METADATA_HEDGING_CONCURRENCY_BUDGET = 8 + class CrossRegionHedgingStrategy: """Configuration for cross-region request hedging strategy. @@ -34,7 +42,7 @@ class CrossRegionHedgingStrategy: :param config: Dictionary containing configuration values, defaults to None :type config: Optional[Dict[str, Any]] :raises ValueError: If configuration values are invalid - + The config dictionary can contain: - threshold_ms: Time in ms before routing to alternate region (default: 500) - threshold_steps_ms: Time interval between routing attempts (default: 100) @@ -53,11 +61,49 @@ def __init__(self, config: Optional[dict[str, Any]] = None) -> None: raise ValueError("threshold_steps_ms must be positive") +class MetadataCrossRegionHedgingStrategy(CrossRegionHedgingStrategy): + """Cold-start metadata cache cross-region hedging configuration. + + Unlike :class:`CrossRegionHedgingStrategy`, the metadata hedging threshold is a + fixed, SDK-derived value and is not customer-configurable. The strategy is bounded + to the primary request plus a single cross-region hedge. + """ + + def __init__(self) -> None: + super().__init__( + { + "threshold_ms": DEFAULT_METADATA_HEDGING_THRESHOLD_MS, + "threshold_steps_ms": DEFAULT_METADATA_HEDGING_THRESHOLD_STEPS_MS, + } + ) + + +def resolve_metadata_hedging_opt_in(opt_in: Optional[bool], ppaf_enabled: bool) -> bool: + """Resolve the tri-state cold-start metadata hedging opt-in to a concrete bool. + + When the customer leaves the opt-in ``None`` (the default), cold-start metadata + hedging follows the account's PPAF (Per-Partition Automatic Failover) state: it is + enabled when PPAF is enabled and disabled otherwise. An explicit ``True`` enables + hedging even when PPAF is disabled, and an explicit ``False`` disables it regardless + of PPAF. + + :param opt_in: The customer-supplied tri-state opt-in (True/False/None). + :type opt_in: Optional[bool] + :param ppaf_enabled: Whether Per-Partition Automatic Failover is enabled for the account. + :type ppaf_enabled: bool + :returns: True if cold-start metadata hedging should be applied, False otherwise. + :rtype: bool + """ + if opt_in is None: + return ppaf_enabled + return opt_in + + def _validate_request_hedging_strategy( config: Optional[Union[bool, dict[str, Any]]] ) -> Union[CrossRegionHedgingStrategy, bool, None]: """Validate and create a CrossRegionHedgingStrategy for a request. - + :param config: Configuration for availability strategy. Can be: - None: Returns None (no strategy, uses client default if available) - True: Returns strategy with default values (threshold_ms=500, threshold_steps_ms=100) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py index a5d01a7a12c9..f79a57ed5604 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py @@ -62,6 +62,7 @@ from . import http_constants, exceptions from ._auth_policy import CosmosBearerTokenCredentialPolicy from ._availability_strategy_config import validate_client_hedging_strategy, CrossRegionHedgingStrategy +from ._metadata_hedging import MetadataCrossRegionHedgingHandler from ._base import _build_properties_cache from ._change_feed.change_feed_iterable import ChangeFeedIterable from ._change_feed.change_feed_state import ChangeFeedState @@ -143,6 +144,7 @@ def __init__( # pylint: disable=too-many-statements consistency_level: Optional[str] = None, availability_strategy: Union[bool, dict[str, Any]] = False, availability_strategy_executor: Optional[ThreadPoolExecutor] = None, + enable_metadata_hedging_for_cold_start: Optional[bool] = None, **kwargs: Any ) -> None: """ @@ -170,6 +172,12 @@ def __init__( # pylint: disable=too-many-statements self.availability_strategy: Union[CrossRegionHedgingStrategy, None] =\ validate_client_hedging_strategy(availability_strategy) self.availability_strategy_executor: Optional[ThreadPoolExecutor] = availability_strategy_executor + # Tri-state opt-in for cold-start metadata cache cross-region hedging. None follows the + # account's PPAF state, True forces it on, False disables it (no handler is created). + self._metadata_hedging_opt_in: Optional[bool] = enable_metadata_hedging_for_cold_start + self._metadata_hedging_handler: Optional[MetadataCrossRegionHedgingHandler] = ( + None if enable_metadata_hedging_for_cold_start is False else MetadataCrossRegionHedgingHandler() + ) self.master_key: Optional[str] = None self.resource_tokens: Optional[Mapping[str, Any]] = None diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_metadata_hedging.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_metadata_hedging.py new file mode 100644 index 000000000000..a60063363844 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_metadata_hedging.py @@ -0,0 +1,325 @@ +# The MIT License (MIT) +# Copyright (c) Microsoft Corporation + +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: + +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. + +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +"""Cold-start metadata cache cross-region hedging for Azure Cosmos DB. + +This is the synchronous port of the .NET ``MetadataHedgingStrategy`` (PR +Azure/azure-cosmos-dotnet-v3#5923). It provides bounded cross-region hedging for +cold-start metadata cache reads (container/Collection reads and PartitionKeyRange +read-feed reads): the primary request is dispatched immediately and, if it has not +produced an acceptable response within a fixed SDK-derived threshold, a single hedge +request is dispatched to a second region. The first acceptable winner is returned. +""" + +import copy +import logging +import os +import time +from concurrent.futures import CancelledError, Future, ThreadPoolExecutor, as_completed +from threading import Event, Semaphore +from types import SimpleNamespace +from typing import Any, Callable, Dict, List, Optional, Tuple + +from azure.core.exceptions import ServiceRequestError, ServiceResponseError # pylint: disable=no-legacy-azure-core-http-response-import +from azure.core.pipeline.transport import HttpRequest # pylint: disable=no-legacy-azure-core-http-response-import + +from . import exceptions +from ._availability_strategy_config import ( + DEFAULT_METADATA_HEDGING_CONCURRENCY_BUDGET, + MetadataCrossRegionHedgingStrategy, +) +from ._availability_strategy_handler_base import AvailabilityStrategyHandlerMixin +from ._global_partition_endpoint_manager_circuit_breaker import _GlobalPartitionEndpointManagerForCircuitBreaker +from ._request_object import RequestObject +from .documents import _OperationType +from .http_constants import ResourceType, StatusCodes, SubStatusCodes + +logger = logging.getLogger("azure.cosmos.metadata_hedging") + +ResponseType = Tuple[Dict[str, Any], Dict[str, Any]] + + +class MetadataHedgeSkipReason: + """Reason a cold-start metadata hedge was not dispatched (for diagnostics/logging).""" + NONE = "None" + NOT_SUPPORTED_RESOURCE = "ResourceTypeNotSupported" + ALREADY_HEDGING = "AlreadyHedgedThisOperation" + SINGLE_REGION = "SingleRegion" + BUDGET_EXHAUSTED = "BudgetExhausted" + + +def is_supported_metadata_request(request_params: RequestObject) -> bool: + """Return True if the request is a metadata read eligible for cold-start hedging. + + Supported reads mirror the .NET design: a Collection read or a PartitionKeyRange + read-feed. PartitionKeyRange reads issued as a plain ``Read`` are also accepted + defensively. + + :param request_params: The request parameters. + :type request_params: ~azure.cosmos._request_object.RequestObject + :returns: True if the request is a supported metadata read. + :rtype: bool + """ + resource_type = request_params.resource_type + operation_type = request_params.operation_type + if resource_type == ResourceType.Collection and operation_type == _OperationType.Read: + return True + if resource_type == ResourceType.PartitionKeyRange and operation_type in ( + _OperationType.ReadFeed, + _OperationType.Read, + ): + return True + return False + + +def is_regional_failure( + status_code: Optional[int], + sub_status: Optional[int], + exception: Optional[BaseException], +) -> bool: + """Return True if the response/exception is a regional failure for metadata hedging. + + A regional failure is one that should advance a metadata read to a different region + (so it must not be accepted as a winner). This mirrors the .NET ``IsRegionalFailure`` + classification: transport-level failures and timeouts, ``503``, ``500``, and + ``403`` with sub-status ``DatabaseAccountNotFound``. + + :param status_code: HTTP status code from the response, or None for a transport failure. + :type status_code: Optional[int] + :param sub_status: Cosmos sub-status code from the response. + :type sub_status: Optional[int] + :param exception: Exception observed instead of (or in addition to) the response, or None. + :type exception: Optional[BaseException] + :returns: True if the failure is regional. + :rtype: bool + """ + if isinstance(exception, (ServiceRequestError, ServiceResponseError)): + return True + + if status_code is None: + return False + + if status_code in (StatusCodes.SERVICE_UNAVAILABLE, StatusCodes.INTERNAL_SERVER_ERROR): + return True + if status_code == StatusCodes.FORBIDDEN and sub_status == SubStatusCodes.DATABASE_ACCOUNT_NOT_FOUND: + return True + return False + + +def _status_codes_from_exception(exception: BaseException) -> Tuple[Optional[int], Optional[int]]: + if isinstance(exception, exceptions.CosmosHttpResponseError): + return exception.status_code, exception.sub_status + return None, None + + +class MetadataCrossRegionHedgingHandler(AvailabilityStrategyHandlerMixin): + """Bounded cross-region hedging handler for cold-start metadata cache reads. + + One instance per client. The per-client concurrency budget caps the number of + in-flight metadata hedges; when it is exhausted, eligible requests fall back to a + primary-only send. + + :param concurrency_budget: Max number of in-flight metadata hedges for this client. + :type concurrency_budget: int + """ + + def __init__(self, concurrency_budget: int = DEFAULT_METADATA_HEDGING_CONCURRENCY_BUDGET) -> None: + self._budget = Semaphore(max(1, concurrency_budget)) + # Long-lived executor shared across this client's metadata hedges. + self._executor = ThreadPoolExecutor(max_workers=os.cpu_count()) # pylint: disable=consider-using-with + + def is_acceptable_winner( # pylint: disable=unused-argument + self, + result: Optional[ResponseType], + exception: Optional[BaseException], + is_hedge: bool, + ) -> bool: + """Return True if a settled branch is an acceptable winner. + + A successful response is always acceptable. A regional-failure exception is never + acceptable (the other branch should win). A hedge-branch ``401``/``403`` response + is rejected so a hedge auth failure can never win over the primary. + + :param result: The successful response tuple, or None if the branch raised. + :type result: Optional[Tuple[dict, dict]] + :param exception: The exception raised by the branch, or None on success. + :type exception: Optional[BaseException] + :param is_hedge: Whether the branch is the hedge (non-primary) branch. + :type is_hedge: bool + :returns: True if the branch is an acceptable winner. + :rtype: bool + """ + if exception is None: + return True + + if isinstance(exception, CancelledError): + return False + + status_code, sub_status = _status_codes_from_exception(exception) + + if is_regional_failure(status_code, sub_status, exception): + return False + + if is_hedge and status_code in (StatusCodes.UNAUTHORIZED, StatusCodes.FORBIDDEN): + return False + + # A non-regional, non-auth definitive error (e.g. 404) is a real answer; surface it. + return True + + def _send_with_delay( + self, + request_params: RequestObject, + request: HttpRequest, + execute_request_fn: Callable[..., ResponseType], + location_index: int, + available_locations: List[str], + complete_status: Event, + first_request_params_holder: SimpleNamespace, + ) -> ResponseType: + strategy = request_params.availability_strategy + if strategy is None: + raise ValueError("availability_strategy should not be null for metadata hedging") + + delay = 0 if location_index == 0 else strategy.threshold_ms + if delay > 0: + time.sleep(delay / 1000) + + params = copy.deepcopy(request_params) + params.is_hedging_request = location_index > 0 + params.completion_status = complete_status + params.excluded_locations = self._create_excluded_regions_for_hedging( + location_index, + available_locations, + request_params.excluded_locations, + ) + + req = copy.deepcopy(request) + if location_index == 0: + first_request_params_holder.request_params = params + + if complete_status.is_set(): + raise CancelledError("The request has been cancelled") + + return execute_request_fn(params, req) + + def execute_request( + self, + request_params: RequestObject, + global_endpoint_manager: _GlobalPartitionEndpointManagerForCircuitBreaker, + request: HttpRequest, + execute_request_fn: Callable[..., ResponseType], + ) -> ResponseType: + """Execute a metadata read with bounded primary + single-hedge cross-region hedging. + + :param request_params: Request parameters for the metadata read. + :type request_params: ~azure.cosmos._request_object.RequestObject + :param global_endpoint_manager: Manager for endpoint routing and health tracking. + :type global_endpoint_manager: + ~azure.cosmos._GlobalPartitionEndpointManagerForCircuitBreaker + :param request: The HTTP request to be executed. + :type request: ~azure.core.pipeline.transport.HttpRequest + :param execute_request_fn: Function that executes the actual request. + :type execute_request_fn: Callable[..., Tuple[dict, dict]] + :returns: The winning response tuple. + :rtype: Tuple[dict, dict] + """ + available_locations = self._get_applicable_endpoints(request_params, global_endpoint_manager) + if len(available_locations) <= 1: + logger.debug("Metadata hedge skipped: %s", MetadataHedgeSkipReason.SINGLE_REGION) + return execute_request_fn(request_params, request) + + acquired = self._budget.acquire(blocking=False) # pylint: disable=consider-using-with + if not acquired: + logger.debug("Metadata hedge skipped: %s", MetadataHedgeSkipReason.BUDGET_EXHAUSTED) + return execute_request_fn(request_params, request) + + completion_status = Event() + first_request_params_holder: SimpleNamespace = SimpleNamespace(request_params=None) + try: + primary_future = self._executor.submit( + self._send_with_delay, request_params, request, execute_request_fn, + 0, available_locations, completion_status, first_request_params_holder) + hedge_future = self._executor.submit( + self._send_with_delay, request_params, request, execute_request_fn, + 1, available_locations, completion_status, first_request_params_holder) + futures: List[Future] = [primary_future, hedge_future] + + for completed_future in as_completed(futures): + is_primary = completed_future is primary_future + exception = completed_future.exception() + result = None if exception is not None else completed_future.result() + + if self.is_acceptable_winner(result, exception, is_hedge=not is_primary): + completion_status.set() + if not is_primary: + # Hedge won; record a failure for the primary region so health + # tracking can react to the slow/failed primary. + self._record_cancel_for_first_request(first_request_params_holder, global_endpoint_manager) + if exception is not None: + raise exception + return result # type: ignore[return-value] + + # Neither branch produced an acceptable winner; prefer the primary's outcome + # so the metadata read fails the same way it would without hedging. + completion_status.set() + primary_exception = primary_future.exception() + if primary_exception is not None: + raise primary_exception + return primary_future.result() + finally: + completion_status.set() + self._budget.release() + + def _record_cancel_for_first_request( + self, + request_params_holder: SimpleNamespace, + global_endpoint_manager: Any, + ) -> None: + if request_params_holder.request_params is not None: + global_endpoint_manager.record_failure(request_params_holder.request_params) + + +def execute_metadata_hedging( + handler: MetadataCrossRegionHedgingHandler, + request_params: RequestObject, + global_endpoint_manager: _GlobalPartitionEndpointManagerForCircuitBreaker, + request: HttpRequest, + execute_request_fn: Callable[..., ResponseType], +) -> ResponseType: + """Execute a metadata read with cold-start cross-region hedging. + + :param handler: The per-client metadata hedging handler. + :type handler: ~azure.cosmos._metadata_hedging.MetadataCrossRegionHedgingHandler + :param request_params: Request parameters for the metadata read. + :type request_params: ~azure.cosmos._request_object.RequestObject + :param global_endpoint_manager: Manager for endpoint routing and health tracking. + :type global_endpoint_manager: + ~azure.cosmos._GlobalPartitionEndpointManagerForCircuitBreaker + :param request: The HTTP request to be executed. + :type request: ~azure.core.pipeline.transport.HttpRequest + :param execute_request_fn: Function that executes the actual request. + :type execute_request_fn: Callable[..., Tuple[dict, dict]] + :returns: The winning response tuple. + :rtype: Tuple[dict, dict] + """ + if request_params.availability_strategy is None: + request_params.availability_strategy = MetadataCrossRegionHedgingStrategy() + return handler.execute_request(request_params, global_endpoint_manager, request, execute_request_fn) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_synchronized_request.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_synchronized_request.py index 1a7e24e5feba..e26b34744cef 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_synchronized_request.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_synchronized_request.py @@ -30,8 +30,9 @@ from azure.core.exceptions import DecodeError # type: ignore from . import exceptions, http_constants, _retry_utility -from ._availability_strategy_config import CrossRegionHedgingStrategy +from ._availability_strategy_config import CrossRegionHedgingStrategy, resolve_metadata_hedging_opt_in from ._availability_strategy_handler import execute_with_hedging +from ._metadata_hedging import execute_metadata_hedging, is_supported_metadata_request from ._constants import _Constants from ._response_decoding import decode_response_body_for_status from ._request_object import RequestObject @@ -246,6 +247,30 @@ def _is_availability_strategy_applicable(request_params: RequestObject) -> bool: request_params.retry_write > 0)) +def _is_metadata_hedging_applicable(client, request_params: RequestObject, global_endpoint_manager) -> bool: + """Determine if cold-start metadata cache hedging should be applied to the request. + + :param client: Document client instance. + :type client: object + :param request_params: Request parameters containing operation details. + :type request_params: ~azure.cosmos._request_object.RequestObject + :param global_endpoint_manager: Manager for endpoint routing and PPAF state. + :type global_endpoint_manager: object + :returns: True if metadata hedging should be applied, False otherwise. + :rtype: bool + """ + handler = getattr(client, "_metadata_hedging_handler", None) + if handler is None: + return False + if request_params.is_hedging_request or request_params.availability_strategy is not None: + return False + if not is_supported_metadata_request(request_params): + return False + opt_in = getattr(client, "_metadata_hedging_opt_in", None) + ppaf_enabled = global_endpoint_manager.is_per_partition_automatic_failover_enabled() + return resolve_metadata_hedging_opt_in(opt_in, ppaf_enabled) + + def _replace_url_prefix(original_url, new_prefix): parts = original_url.split('/', 3) @@ -294,6 +319,25 @@ def SynchronizedRequest( elif request.data is None: request.headers[http_constants.HttpHeaders.ContentLength] = 0 + # Cold-start metadata cache cross-region hedging (Collection / PartitionKeyRange reads). + if _is_metadata_hedging_applicable(client, request_params, global_endpoint_manager): + return execute_metadata_hedging( + client._metadata_hedging_handler, # pylint: disable=protected-access + request_params, + global_endpoint_manager, + request, + lambda req_param, r: _retry_utility.Execute( + client, + global_endpoint_manager, + _Request, + req_param, + connection_policy, + pipeline_client, + r, + **kwargs + ) + ) + if request_params.availability_strategy is None: # if ppaf is enabled, then hedging is enabled by default if global_endpoint_manager.is_per_partition_automatic_failover_enabled(): diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_asynchronous_request.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_asynchronous_request.py index ce7d5a44536c..016e6e8510f6 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_asynchronous_request.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_asynchronous_request.py @@ -30,9 +30,11 @@ from . import _retry_utility_async from ._asynchronous_availability_strategy_handler import execute_with_availability_strategy +from ._metadata_hedging import execute_metadata_hedging as execute_metadata_hedging_async from .. import exceptions from .. import http_constants -from .._availability_strategy_config import CrossRegionHedgingStrategy +from .._availability_strategy_config import CrossRegionHedgingStrategy, resolve_metadata_hedging_opt_in +from .._metadata_hedging import is_supported_metadata_request from .._constants import _Constants from .._request_object import RequestObject from .._response_decoding import decode_response_body_for_status @@ -215,6 +217,30 @@ def _is_availability_strategy_applicable(request_params: RequestObject) -> bool: (not _OperationType.IsWriteOperation(request_params.operation_type) or request_params.retry_write > 0)) + +def _is_metadata_hedging_applicable(client, request_params: RequestObject, global_endpoint_manager) -> bool: + """Determine if cold-start metadata cache hedging should be applied to the request. + + :param client: Document client instance. + :type client: object + :param request_params: Request parameters containing operation details. + :type request_params: ~azure.cosmos._request_object.RequestObject + :param global_endpoint_manager: Manager for endpoint routing and PPAF state. + :type global_endpoint_manager: object + :returns: True if metadata hedging should be applied, False otherwise. + :rtype: bool + """ + handler = getattr(client, "_metadata_hedging_handler", None) + if handler is None: + return False + if request_params.is_hedging_request or request_params.availability_strategy is not None: + return False + if not is_supported_metadata_request(request_params): + return False + opt_in = getattr(client, "_metadata_hedging_opt_in", None) + ppaf_enabled = global_endpoint_manager.is_per_partition_automatic_failover_enabled() + return resolve_metadata_hedging_opt_in(opt_in, ppaf_enabled) + async def AsynchronousRequest( client, request_params, @@ -247,6 +273,25 @@ async def AsynchronousRequest( elif request.data is None: request.headers[http_constants.HttpHeaders.ContentLength] = 0 + # Cold-start metadata cache cross-region hedging (Collection / PartitionKeyRange reads). + if _is_metadata_hedging_applicable(client, request_params, global_endpoint_manager): + return await execute_metadata_hedging_async( + client._metadata_hedging_handler, # pylint: disable=protected-access + request_params, + global_endpoint_manager, + request, + lambda req_param, r: _retry_utility_async.ExecuteAsync( + client, + global_endpoint_manager, + _Request, + req_param, + connection_policy, + pipeline_client, + r, + **kwargs + ) + ) + if request_params.availability_strategy is None: # if ppaf is enabled, then hedging is enabled by default if global_endpoint_manager.is_per_partition_automatic_failover_enabled(): diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client.py index fa109d594c31..a3bd41732258 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client.py @@ -192,6 +192,14 @@ class CosmosClient: # pylint: disable=client-accepts-api-version-keyword Default value is False (hedging disabled). :paramtype availability_strategy: Union[bool, dict[str, Any]] :keyword int availability_strategy_max_concurrency: The max concurrency for parallel requests. + :keyword Optional[bool] enable_metadata_hedging_for_cold_start: + Tri-state opt-in for cold-start metadata cache cross-region hedging. When ``None`` + (the default), it follows the account's Per-Partition Automatic Failover (PPAF) + state. When ``True``, the SDK hedges the first-time population of the container and + partition-key-range metadata caches across regions even when PPAF is disabled. When + ``False``, metadata hedging is suppressed regardless of PPAF. The threshold and other + tuning knobs are SDK-derived defaults and are not customer-configurable. + :paramtype enable_metadata_hedging_for_cold_start: Optional[bool] .. admonition:: Example: @@ -212,6 +220,7 @@ def __init__( consistency_level: Optional[str] = None, availability_strategy: Union[bool, dict[str, Any]] = False, availability_strategy_max_concurrency: Optional[int] = None, + enable_metadata_hedging_for_cold_start: Optional[bool] = None, **kwargs: Any ) -> None: """Instantiate a new CosmosClient.""" @@ -224,6 +233,7 @@ def __init__( connection_policy=connection_policy, availability_strategy=availability_strategy, availability_strategy_max_concurrency=availability_strategy_max_concurrency, + enable_metadata_hedging_for_cold_start=enable_metadata_hedging_for_cold_start, **kwargs ) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py index 1f9f2ca87369..92f4cc16bff3 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py @@ -49,6 +49,7 @@ _GlobalPartitionEndpointManagerForPerPartitionAutomaticFailoverAsync) from .. import _base as base from .._availability_strategy_config import CrossRegionHedgingStrategy, validate_client_hedging_strategy +from ._metadata_hedging import MetadataCrossRegionAsyncHedgingHandler from .._base import _build_properties_cache from .. import documents from .._change_feed.aio.change_feed_iterable import ChangeFeedIterable @@ -139,6 +140,7 @@ def __init__( # pylint: disable=too-many-statements consistency_level: Optional[str] = None, availability_strategy: Union[bool, dict[str, Any]] = False, availability_strategy_max_concurrency: Optional[int] = None, + enable_metadata_hedging_for_cold_start: Optional[bool] = None, **kwargs: Any ) -> None: """ @@ -162,6 +164,12 @@ def __init__( # pylint: disable=too-many-statements self.availability_strategy: Union[CrossRegionHedgingStrategy, None] =\ validate_client_hedging_strategy(availability_strategy) self.availability_strategy_max_concurrency: Optional[int] = availability_strategy_max_concurrency + # Tri-state opt-in for cold-start metadata cache cross-region hedging. None follows the + # account's PPAF state, True forces it on, False disables it (no handler is created). + self._metadata_hedging_opt_in: Optional[bool] = enable_metadata_hedging_for_cold_start + self._metadata_hedging_handler: Optional[MetadataCrossRegionAsyncHedgingHandler] = ( + None if enable_metadata_hedging_for_cold_start is False else MetadataCrossRegionAsyncHedgingHandler() + ) self.master_key: Optional[str] = None self.resource_tokens: Optional[Mapping[str, Any]] = None self.aad_credentials: Optional[AsyncTokenCredential] = None diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_metadata_hedging.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_metadata_hedging.py new file mode 100644 index 000000000000..7a9d6a1297f7 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_metadata_hedging.py @@ -0,0 +1,245 @@ +# The MIT License (MIT) +# Copyright (c) Microsoft Corporation + +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: + +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. + +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +"""Asynchronous cold-start metadata cache cross-region hedging for Azure Cosmos DB. + +Async port of the .NET ``MetadataHedgingStrategy`` (PR +Azure/azure-cosmos-dotnet-v3#5923). See :mod:`azure.cosmos._metadata_hedging` for the +synchronous counterpart and design notes. +""" + +import asyncio # pylint: disable=do-not-import-asyncio +import copy +import logging +from asyncio import CancelledError, Event, Task +from types import SimpleNamespace +from typing import Any, Awaitable, Callable, Dict, List, Optional, Tuple + +from azure.core.pipeline.transport import HttpRequest # pylint: disable=no-legacy-azure-core-http-response-import + +from .._availability_strategy_config import ( + DEFAULT_METADATA_HEDGING_CONCURRENCY_BUDGET, + MetadataCrossRegionHedgingStrategy, +) +from .._availability_strategy_handler_base import AvailabilityStrategyHandlerMixin +from .._metadata_hedging import _status_codes_from_exception, is_regional_failure +from .._request_object import RequestObject +from ..http_constants import StatusCodes +from ._global_partition_endpoint_manager_circuit_breaker_async import \ + _GlobalPartitionEndpointManagerForCircuitBreakerAsync + +logger = logging.getLogger("azure.cosmos.metadata_hedging") + +ResponseType = Tuple[Dict[str, Any], Dict[str, Any]] + + +class MetadataCrossRegionAsyncHedgingHandler(AvailabilityStrategyHandlerMixin): + """Bounded async cross-region hedging handler for cold-start metadata cache reads. + + One instance per client. The per-client concurrency budget caps the number of + in-flight metadata hedges; when it is exhausted, eligible requests fall back to a + primary-only send. + + :param concurrency_budget: Max number of in-flight metadata hedges for this client. + :type concurrency_budget: int + """ + + def __init__(self, concurrency_budget: int = DEFAULT_METADATA_HEDGING_CONCURRENCY_BUDGET) -> None: + self._budget = asyncio.Semaphore(max(1, concurrency_budget)) + + def is_acceptable_winner( # pylint: disable=unused-argument + self, + result: Optional[ResponseType], + exception: Optional[BaseException], + is_hedge: bool, + ) -> bool: + """Return True if a settled branch is an acceptable winner. + + :param result: The successful response tuple, or None if the branch raised. + :type result: Optional[Tuple[dict, dict]] + :param exception: The exception raised by the branch, or None on success. + :type exception: Optional[BaseException] + :param is_hedge: Whether the branch is the hedge (non-primary) branch. + :type is_hedge: bool + :returns: True if the branch is an acceptable winner. + :rtype: bool + """ + if exception is None: + return True + + if isinstance(exception, CancelledError): + return False + + status_code, sub_status = _status_codes_from_exception(exception) + + if is_regional_failure(status_code, sub_status, exception): + return False + + if is_hedge and status_code in (StatusCodes.UNAUTHORIZED, StatusCodes.FORBIDDEN): + return False + + return True + + async def _send_with_delay( + self, + request_params: RequestObject, + request: HttpRequest, + execute_request_fn: Callable[..., Awaitable[ResponseType]], + location_index: int, + available_locations: List[str], + complete_status: Event, + first_request_params_holder: SimpleNamespace, + ) -> ResponseType: + strategy = request_params.availability_strategy + if strategy is None: + raise ValueError("availability_strategy should not be null for metadata hedging") + + delay = 0 if location_index == 0 else strategy.threshold_ms + if delay > 0: + await asyncio.sleep(delay / 1000) + + params = copy.deepcopy(request_params) + params.is_hedging_request = location_index > 0 + params.completion_status = complete_status + params.excluded_locations = self._create_excluded_regions_for_hedging( + location_index, + available_locations, + request_params.excluded_locations, + ) + + req = copy.deepcopy(request) + if location_index == 0: + first_request_params_holder.request_params = params + + if complete_status.is_set(): + raise CancelledError("The request has been cancelled") + + return await execute_request_fn(params, req) + + async def execute_request( + self, + request_params: RequestObject, + global_endpoint_manager: _GlobalPartitionEndpointManagerForCircuitBreakerAsync, + request: HttpRequest, + execute_request_fn: Callable[..., Awaitable[ResponseType]], + ) -> ResponseType: + """Execute a metadata read with bounded primary + single-hedge cross-region hedging. + + :param request_params: Request parameters for the metadata read. + :type request_params: ~azure.cosmos._request_object.RequestObject + :param global_endpoint_manager: Manager for endpoint routing and health tracking. + :type global_endpoint_manager: + ~azure.cosmos.aio._GlobalPartitionEndpointManagerForCircuitBreakerAsync + :param request: The HTTP request to be executed. + :type request: ~azure.core.pipeline.transport.HttpRequest + :param execute_request_fn: Async function that executes the actual request. + :type execute_request_fn: Callable[..., Awaitable[Tuple[dict, dict]]] + :returns: The winning response tuple. + :rtype: Tuple[dict, dict] + """ + available_locations = self._get_applicable_endpoints(request_params, global_endpoint_manager) + if len(available_locations) <= 1: + logger.debug("Metadata hedge skipped: SingleRegion") + return await execute_request_fn(request_params, request) + + if self._budget.locked(): + logger.debug("Metadata hedge skipped: BudgetExhausted") + return await execute_request_fn(request_params, request) + + await self._budget.acquire() + completion_status = Event() + first_request_params_holder: SimpleNamespace = SimpleNamespace(request_params=None) + active_tasks: List[Task] = [] + try: + primary_task = asyncio.create_task(self._send_with_delay( + request_params, request, execute_request_fn, + 0, available_locations, completion_status, first_request_params_holder)) + hedge_task = asyncio.create_task(self._send_with_delay( + request_params, request, execute_request_fn, + 1, available_locations, completion_status, first_request_params_holder)) + active_tasks = [primary_task, hedge_task] + + pending = set(active_tasks) + while pending: + done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED) + for completed_task in done: + is_primary = completed_task is primary_task + exception = completed_task.exception() + result = None if exception is not None else completed_task.result() + + if self.is_acceptable_winner(result, exception, is_hedge=not is_primary): + completion_status.set() + if not is_primary: + await self._record_cancel_for_first_request( + first_request_params_holder, global_endpoint_manager) + if exception is not None: + raise exception + return result # type: ignore[return-value] + + # Neither branch produced an acceptable winner; prefer the primary's outcome. + completion_status.set() + primary_exception = primary_task.exception() + if primary_exception is not None: + raise primary_exception + return primary_task.result() + finally: + completion_status.set() + for task in active_tasks: + if not task.done(): + task.cancel() + await asyncio.gather(*active_tasks, return_exceptions=True) + self._budget.release() + + async def _record_cancel_for_first_request( + self, + request_params_holder: SimpleNamespace, + global_endpoint_manager: Any, + ) -> None: + if request_params_holder.request_params is not None: + await global_endpoint_manager.record_failure(request_params_holder.request_params) + + +async def execute_metadata_hedging( + handler: MetadataCrossRegionAsyncHedgingHandler, + request_params: RequestObject, + global_endpoint_manager: _GlobalPartitionEndpointManagerForCircuitBreakerAsync, + request: HttpRequest, + execute_request_fn: Callable[..., Awaitable[ResponseType]], +) -> ResponseType: + """Execute a metadata read with cold-start cross-region hedging. + + :param handler: The per-client metadata hedging handler. + :type handler: ~azure.cosmos.aio._metadata_hedging.MetadataCrossRegionAsyncHedgingHandler + :param request_params: Request parameters for the metadata read. + :type request_params: ~azure.cosmos._request_object.RequestObject + :param global_endpoint_manager: Manager for endpoint routing and health tracking. + :type global_endpoint_manager: + ~azure.cosmos.aio._GlobalPartitionEndpointManagerForCircuitBreakerAsync + :param request: The HTTP request to be executed. + :type request: ~azure.core.pipeline.transport.HttpRequest + :param execute_request_fn: Async function that executes the actual request. + :type execute_request_fn: Callable[..., Awaitable[Tuple[dict, dict]]] + :returns: The winning response tuple. + :rtype: Tuple[dict, dict] + """ + if request_params.availability_strategy is None: + request_params.availability_strategy = MetadataCrossRegionHedgingStrategy() + return await handler.execute_request(request_params, global_endpoint_manager, request, execute_request_fn) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/cosmos_client.py b/sdk/cosmos/azure-cosmos/azure/cosmos/cosmos_client.py index 360bdca53a63..4508c45ac92b 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/cosmos_client.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/cosmos_client.py @@ -215,6 +215,14 @@ class CosmosClient: # pylint: disable=client-accepts-api-version-keyword :paramtype availability_strategy: Union[bool, dict[str, Any]] :keyword ~concurrent.futures.thread.ThreadPoolExecutor availability_strategy_executor: Optional ThreadPoolExecutor for handling concurrent operations. + :keyword Optional[bool] enable_metadata_hedging_for_cold_start: + Tri-state opt-in for cold-start metadata cache cross-region hedging. When ``None`` + (the default), it follows the account's Per-Partition Automatic Failover (PPAF) + state. When ``True``, the SDK hedges the first-time population of the container and + partition-key-range metadata caches across regions even when PPAF is disabled. When + ``False``, metadata hedging is suppressed regardless of PPAF. The threshold and other + tuning knobs are SDK-derived defaults and are not customer-configurable. + :paramtype enable_metadata_hedging_for_cold_start: Optional[bool] .. admonition:: Example: @@ -245,6 +253,7 @@ def __init__( connection_policy=connection_policy, availability_strategy=kwargs.pop("availability_strategy", False), availability_strategy_executor=kwargs.pop("availability_strategy_executor", None), + enable_metadata_hedging_for_cold_start=kwargs.pop("enable_metadata_hedging_for_cold_start", None), **kwargs ) diff --git a/sdk/cosmos/azure-cosmos/tests/test_metadata_hedging.py b/sdk/cosmos/azure-cosmos/tests/test_metadata_hedging.py new file mode 100644 index 000000000000..172ef8f5135d --- /dev/null +++ b/sdk/cosmos/azure-cosmos/tests/test_metadata_hedging.py @@ -0,0 +1,262 @@ +# The MIT License (MIT) +# Copyright (c) Microsoft Corporation. All rights reserved. + +"""Unit tests for cold-start metadata cache cross-region hedging (sync).""" + +import threading +import time +import unittest + +from azure.core.exceptions import ServiceRequestError +from azure.core.pipeline.transport import HttpRequest + +from azure.cosmos._availability_strategy_config import ( + MetadataCrossRegionHedgingStrategy, + resolve_metadata_hedging_opt_in, +) +from azure.cosmos._metadata_hedging import ( + MetadataCrossRegionHedgingHandler, + execute_metadata_hedging, + is_regional_failure, + is_supported_metadata_request, +) +from azure.cosmos._request_object import RequestObject +from azure.cosmos.documents import _OperationType +from azure.cosmos.exceptions import CosmosHttpResponseError +from azure.cosmos.http_constants import ResourceType, StatusCodes, SubStatusCodes + + +class _FakeContext: + def __init__(self, endpoint): + self._endpoint = endpoint + + def get_primary(self): + return self._endpoint + + +class _FakeGlobalEndpointManager: + def __init__(self, regions, ppaf=False): + self._contexts = [_FakeContext(r) for r in regions] + self._ppaf = ppaf + self.recorded_failures = [] + + def get_applicable_read_regional_routing_contexts(self, request): # noqa: ARG002 + return self._contexts + + def get_applicable_write_regional_routing_contexts(self, request): # noqa: ARG002 + return self._contexts + + def get_region_name(self, endpoint, is_write): # noqa: ARG002 + return endpoint + + def is_per_partition_automatic_failover_enabled(self): + return self._ppaf + + def record_failure(self, request_params): + self.recorded_failures.append(request_params) + + +def _metadata_request(): + req = RequestObject(ResourceType.Collection, _OperationType.Read, {}) + req.availability_strategy = MetadataCrossRegionHedgingStrategy() + # Shorten the threshold so tests don't wait the full 1.5s. + req.availability_strategy.threshold_ms = 150 + return req + + +def _http_request(): + return HttpRequest("GET", "https://primary.documents.azure.com/") + + +class TestMetadataHedgingHelpers(unittest.TestCase): + def test_resolve_opt_in_tri_state(self): + self.assertTrue(resolve_metadata_hedging_opt_in(None, True)) + self.assertFalse(resolve_metadata_hedging_opt_in(None, False)) + self.assertTrue(resolve_metadata_hedging_opt_in(True, False)) + self.assertFalse(resolve_metadata_hedging_opt_in(False, True)) + + def test_is_regional_failure(self): + self.assertTrue(is_regional_failure(StatusCodes.SERVICE_UNAVAILABLE, None, None)) + self.assertTrue(is_regional_failure(StatusCodes.INTERNAL_SERVER_ERROR, None, None)) + self.assertTrue(is_regional_failure( + StatusCodes.FORBIDDEN, SubStatusCodes.DATABASE_ACCOUNT_NOT_FOUND, None)) + self.assertTrue(is_regional_failure(None, None, ServiceRequestError(message="boom"))) + self.assertFalse(is_regional_failure(StatusCodes.NOT_FOUND, None, None)) + self.assertFalse(is_regional_failure(StatusCodes.FORBIDDEN, None, None)) + self.assertFalse(is_regional_failure(None, None, None)) + + def test_is_supported_metadata_request(self): + self.assertTrue(is_supported_metadata_request( + RequestObject(ResourceType.Collection, _OperationType.Read, {}))) + self.assertTrue(is_supported_metadata_request( + RequestObject(ResourceType.PartitionKeyRange, _OperationType.ReadFeed, {}))) + self.assertTrue(is_supported_metadata_request( + RequestObject(ResourceType.PartitionKeyRange, _OperationType.Read, {}))) + self.assertFalse(is_supported_metadata_request( + RequestObject(ResourceType.Document, _OperationType.Read, {}))) + self.assertFalse(is_supported_metadata_request( + RequestObject(ResourceType.Collection, _OperationType.Create, {}))) + + +class TestMetadataHedgingHandler(unittest.TestCase): + def setUp(self): + self.handler = MetadataCrossRegionHedgingHandler(concurrency_budget=8) + self.gem = _FakeGlobalEndpointManager(["region-1", "region-2"]) + + def test_primary_wins_fast_no_hedge(self): + calls = [] + + def execute_fn(params, _req): + calls.append(params.is_hedging_request) + if params.is_hedging_request: + # Hedge should not win; sleep beyond test horizon. + time.sleep(5) + return ({"source": "hedge"}, {}) + return ({"source": "primary"}, {}) + + result, _ = self.handler.execute_request( + _metadata_request(), self.gem, _http_request(), execute_fn) + self.assertEqual(result["source"], "primary") + self.assertEqual(self.gem.recorded_failures, []) + + def test_hedge_wins_when_primary_slow(self): + def execute_fn(params, _req): + if params.is_hedging_request: + return ({"source": "hedge"}, {}) + time.sleep(5) + return ({"source": "primary"}, {}) + + result, _ = self.handler.execute_request( + _metadata_request(), self.gem, _http_request(), execute_fn) + self.assertEqual(result["source"], "hedge") + # Primary failure should be recorded when the hedge wins. + self.assertEqual(len(self.gem.recorded_failures), 1) + + def test_hedge_auth_reject_not_accepted(self): + def execute_fn(params, _req): + if params.is_hedging_request: + raise CosmosHttpResponseError(status_code=StatusCodes.FORBIDDEN, message="auth") + time.sleep(0.3) + return ({"source": "primary"}, {}) + + result, _ = self.handler.execute_request( + _metadata_request(), self.gem, _http_request(), execute_fn) + # The hedge's 401/403 must never win; primary's success is returned. + self.assertEqual(result["source"], "primary") + + def test_regional_failure_hedge_lets_primary_decide(self): + def execute_fn(params, _req): + if params.is_hedging_request: + raise CosmosHttpResponseError( + status_code=StatusCodes.SERVICE_UNAVAILABLE, message="503") + time.sleep(0.3) + return ({"source": "primary"}, {}) + + result, _ = self.handler.execute_request( + _metadata_request(), self.gem, _http_request(), execute_fn) + self.assertEqual(result["source"], "primary") + + def test_single_region_primary_only(self): + gem = _FakeGlobalEndpointManager(["region-1"]) + count = {"n": 0} + + def execute_fn(params, _req): # noqa: ARG001 + count["n"] += 1 + return ({"source": "primary"}, {}) + + result, _ = self.handler.execute_request( + _metadata_request(), gem, _http_request(), execute_fn) + self.assertEqual(result["source"], "primary") + self.assertEqual(count["n"], 1) + + def test_budget_exhausted_primary_only(self): + handler = MetadataCrossRegionHedgingHandler(concurrency_budget=1) + # Exhaust the budget so the next request must fall back to primary-only. + self.assertTrue(handler._budget.acquire(blocking=False)) # pylint: disable=protected-access + try: + count = {"n": 0} + + def execute_fn(params, _req): # noqa: ARG001 + count["n"] += 1 + return ({"source": "primary"}, {}) + + result, _ = handler.execute_request( + _metadata_request(), self.gem, _http_request(), execute_fn) + self.assertEqual(result["source"], "primary") + self.assertEqual(count["n"], 1) + finally: + handler._budget.release() # pylint: disable=protected-access + + def test_both_branches_fail_prefers_primary(self): + def execute_fn(params, _req): + if params.is_hedging_request: + raise CosmosHttpResponseError( + status_code=StatusCodes.SERVICE_UNAVAILABLE, message="hedge-503") + raise CosmosHttpResponseError( + status_code=StatusCodes.SERVICE_UNAVAILABLE, message="primary-503") + + with self.assertRaises(CosmosHttpResponseError) as ctx: + self.handler.execute_request( + _metadata_request(), self.gem, _http_request(), execute_fn) + self.assertIn("primary-503", str(ctx.exception)) + + def test_execute_metadata_hedging_sets_strategy(self): + req = RequestObject(ResourceType.Collection, _OperationType.Read, {}) + self.assertIsNone(req.availability_strategy) + + def execute_fn(params, _req): # noqa: ARG001 + return ({"source": "primary"}, {}) + + gem = _FakeGlobalEndpointManager(["region-1"]) + execute_metadata_hedging(self.handler, req, gem, _http_request(), execute_fn) + self.assertIsInstance(req.availability_strategy, MetadataCrossRegionHedgingStrategy) + + +class _FakeClient: + def __init__(self, handler, opt_in): + self._metadata_hedging_handler = handler + self._metadata_hedging_opt_in = opt_in + + +class TestMetadataHedgingApplicability(unittest.TestCase): + def setUp(self): + from azure.cosmos._synchronized_request import _is_metadata_hedging_applicable + self._is_applicable = _is_metadata_hedging_applicable + self.handler = MetadataCrossRegionHedgingHandler() + + def _request(self, resource_type=ResourceType.Collection, operation=_OperationType.Read): + return RequestObject(resource_type, operation, {}) + + def test_applicable_when_opt_in_true(self): + client = _FakeClient(self.handler, True) + gem = _FakeGlobalEndpointManager(["r1", "r2"], ppaf=False) + self.assertTrue(self._is_applicable(client, self._request(), gem)) + + def test_follows_ppaf_when_opt_in_none(self): + client = _FakeClient(self.handler, None) + self.assertTrue(self._is_applicable( + client, self._request(), _FakeGlobalEndpointManager(["r1", "r2"], ppaf=True))) + self.assertFalse(self._is_applicable( + client, self._request(), _FakeGlobalEndpointManager(["r1", "r2"], ppaf=False))) + + def test_not_applicable_without_handler(self): + client = _FakeClient(None, True) + gem = _FakeGlobalEndpointManager(["r1", "r2"], ppaf=True) + self.assertFalse(self._is_applicable(client, self._request(), gem)) + + def test_not_applicable_for_unsupported_resource(self): + client = _FakeClient(self.handler, True) + gem = _FakeGlobalEndpointManager(["r1", "r2"], ppaf=True) + req = self._request(resource_type=ResourceType.Document) + self.assertFalse(self._is_applicable(client, req, gem)) + + def test_not_applicable_for_hedging_request(self): + client = _FakeClient(self.handler, True) + gem = _FakeGlobalEndpointManager(["r1", "r2"], ppaf=True) + req = self._request() + req.is_hedging_request = True + self.assertFalse(self._is_applicable(client, req, gem)) + + +if __name__ == "__main__": + unittest.main() diff --git a/sdk/cosmos/azure-cosmos/tests/test_metadata_hedging_async.py b/sdk/cosmos/azure-cosmos/tests/test_metadata_hedging_async.py new file mode 100644 index 000000000000..098824b91064 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/tests/test_metadata_hedging_async.py @@ -0,0 +1,180 @@ +# The MIT License (MIT) +# Copyright (c) Microsoft Corporation. All rights reserved. + +"""Unit tests for cold-start metadata cache cross-region hedging (async).""" + +import asyncio +import unittest + +from azure.core.pipeline.transport import HttpRequest + +from azure.cosmos._availability_strategy_config import MetadataCrossRegionHedgingStrategy +from azure.cosmos._request_object import RequestObject +from azure.cosmos.aio._metadata_hedging import ( + MetadataCrossRegionAsyncHedgingHandler, + execute_metadata_hedging, +) +from azure.cosmos.documents import _OperationType +from azure.cosmos.exceptions import CosmosHttpResponseError +from azure.cosmos.http_constants import ResourceType, StatusCodes + + +class _FakeContext: + def __init__(self, endpoint): + self._endpoint = endpoint + + def get_primary(self): + return self._endpoint + + +class _FakeGlobalEndpointManagerAsync: + def __init__(self, regions, ppaf=False): + self._contexts = [_FakeContext(r) for r in regions] + self._ppaf = ppaf + self.recorded_failures = [] + + def get_applicable_read_regional_routing_contexts(self, request): # noqa: ARG002 + return self._contexts + + def get_applicable_write_regional_routing_contexts(self, request): # noqa: ARG002 + return self._contexts + + def get_region_name(self, endpoint, is_write): # noqa: ARG002 + return endpoint + + def is_per_partition_automatic_failover_enabled(self): + return self._ppaf + + async def record_failure(self, request_params): + self.recorded_failures.append(request_params) + + +def _metadata_request(): + req = RequestObject(ResourceType.Collection, _OperationType.Read, {}) + req.availability_strategy = MetadataCrossRegionHedgingStrategy() + req.availability_strategy.threshold_ms = 150 + return req + + +def _http_request(): + return HttpRequest("GET", "https://primary.documents.azure.com/") + + +class TestMetadataHedgingHandlerAsync(unittest.TestCase): + def setUp(self): + self.handler = MetadataCrossRegionAsyncHedgingHandler(concurrency_budget=8) + self.gem = _FakeGlobalEndpointManagerAsync(["region-1", "region-2"]) + + def test_primary_wins_fast_no_hedge(self): + async def run(): + async def execute_fn(params, _req): + if params.is_hedging_request: + await asyncio.sleep(5) + return ({"source": "hedge"}, {}) + return ({"source": "primary"}, {}) + + result, _ = await self.handler.execute_request( + _metadata_request(), self.gem, _http_request(), execute_fn) + self.assertEqual(result["source"], "primary") + self.assertEqual(self.gem.recorded_failures, []) + + asyncio.run(run()) + + def test_hedge_wins_when_primary_slow(self): + async def run(): + async def execute_fn(params, _req): + if params.is_hedging_request: + return ({"source": "hedge"}, {}) + await asyncio.sleep(5) + return ({"source": "primary"}, {}) + + result, _ = await self.handler.execute_request( + _metadata_request(), self.gem, _http_request(), execute_fn) + self.assertEqual(result["source"], "hedge") + self.assertEqual(len(self.gem.recorded_failures), 1) + + asyncio.run(run()) + + def test_hedge_auth_reject_not_accepted(self): + async def run(): + async def execute_fn(params, _req): + if params.is_hedging_request: + raise CosmosHttpResponseError(status_code=StatusCodes.FORBIDDEN, message="auth") + await asyncio.sleep(0.3) + return ({"source": "primary"}, {}) + + result, _ = await self.handler.execute_request( + _metadata_request(), self.gem, _http_request(), execute_fn) + self.assertEqual(result["source"], "primary") + + asyncio.run(run()) + + def test_single_region_primary_only(self): + async def run(): + gem = _FakeGlobalEndpointManagerAsync(["region-1"]) + count = {"n": 0} + + async def execute_fn(params, _req): # noqa: ARG001 + count["n"] += 1 + return ({"source": "primary"}, {}) + + result, _ = await self.handler.execute_request( + _metadata_request(), gem, _http_request(), execute_fn) + self.assertEqual(result["source"], "primary") + self.assertEqual(count["n"], 1) + + asyncio.run(run()) + + def test_budget_exhausted_primary_only(self): + async def run(): + handler = MetadataCrossRegionAsyncHedgingHandler(concurrency_budget=1) + await handler._budget.acquire() # pylint: disable=protected-access + try: + count = {"n": 0} + + async def execute_fn(params, _req): # noqa: ARG001 + count["n"] += 1 + return ({"source": "primary"}, {}) + + result, _ = await handler.execute_request( + _metadata_request(), self.gem, _http_request(), execute_fn) + self.assertEqual(result["source"], "primary") + self.assertEqual(count["n"], 1) + finally: + handler._budget.release() # pylint: disable=protected-access + + asyncio.run(run()) + + def test_both_branches_fail_prefers_primary(self): + async def run(): + async def execute_fn(params, _req): + if params.is_hedging_request: + raise CosmosHttpResponseError( + status_code=StatusCodes.SERVICE_UNAVAILABLE, message="hedge-503") + raise CosmosHttpResponseError( + status_code=StatusCodes.SERVICE_UNAVAILABLE, message="primary-503") + + with self.assertRaises(CosmosHttpResponseError) as ctx: + await self.handler.execute_request( + _metadata_request(), self.gem, _http_request(), execute_fn) + self.assertIn("primary-503", str(ctx.exception)) + + asyncio.run(run()) + + def test_execute_metadata_hedging_sets_strategy(self): + async def run(): + req = RequestObject(ResourceType.Collection, _OperationType.Read, {}) + self.assertIsNone(req.availability_strategy) + + async def execute_fn(params, _req): # noqa: ARG001 + return ({"source": "primary"}, {}) + + gem = _FakeGlobalEndpointManagerAsync(["region-1"]) + await execute_metadata_hedging(self.handler, req, gem, _http_request(), execute_fn) + self.assertIsInstance(req.availability_strategy, MetadataCrossRegionHedgingStrategy) + + asyncio.run(run()) + + +if __name__ == "__main__": + unittest.main() From 0125be73610ac4964fe8531d632dbedbe02696e6 Mon Sep 17 00:00:00 2001 From: Copilot <223556219+Copilot@users.noreply.github.com> Date: Mon, 15 Jun 2026 14:36:02 -0700 Subject: [PATCH 2/2] Scope metadata hedging to cold-start cache reads + skeptic-review fixes Self-review (Seon thorough) fixes: - Gate hedging on an explicit metadataCachePopulation request flag set only by the container-properties refresh, cold partition-key read, and routing-map fetch callsites, so public container reads and hybrid-search PK reads are no longer hedged. - Size the hedge thread pool to max(cpu_count, 2*budget) so each in-flight hedge always has its primary+hedge threads. - Remove the dead record_failure no-op (metadata cache health is account-global, not per-partition). - Clarify the threshold rationale and the async budget non-blocking check. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../cosmos/_availability_strategy_config.py | 8 +++-- .../azure/cosmos/_cosmos_client_connection.py | 11 ++++-- .../azure/cosmos/_metadata_hedging.py | 36 ++++++++----------- .../azure/cosmos/_request_object.py | 19 +++++++++- .../_routing/aio/routing_map_provider.py | 5 +++ .../cosmos/_routing/routing_map_provider.py | 5 +++ .../azure/cosmos/_synchronized_request.py | 2 ++ .../azure/cosmos/aio/_asynchronous_request.py | 2 ++ .../aio/_cosmos_client_connection_async.py | 9 +++-- .../azure/cosmos/aio/_metadata_hedging.py | 28 ++++++--------- .../tests/test_metadata_hedging.py | 29 ++++++++++++--- .../tests/test_metadata_hedging_async.py | 1 - 12 files changed, 101 insertions(+), 54 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_availability_strategy_config.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_availability_strategy_config.py index 2050662ebb69..4e08d45f6b95 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_availability_strategy_config.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_availability_strategy_config.py @@ -28,9 +28,11 @@ DEFAULT_THRESHOLD_STEPS_MS = 100 # Defaults for cold-start metadata cache hedging. These are SDK-derived and not -# customer-configurable. The threshold mirrors the .NET design (first-attempt -# control-plane timeout ~1s + a 500ms step => 1500ms). The concurrency budget -# caps the number of in-flight metadata hedges per client. +# customer-configurable. The threshold is an aggressive tail-latency trigger: metadata +# (control-plane) reads use a short read timeout (DBAReadTimeout, 3s by default), so the +# 1.5s hedge fires well before the primary's own timeout, giving a slow region a chance +# to be raced by a second region while the primary attempt is still outstanding. The +# concurrency budget caps the number of in-flight metadata hedges per client. DEFAULT_METADATA_HEDGING_THRESHOLD_MS = 1500 DEFAULT_METADATA_HEDGING_THRESHOLD_STEPS_MS = 500 DEFAULT_METADATA_HEDGING_CONCURRENCY_BUDGET = 8 diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py index f79a57ed5604..c2b88dae2040 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py @@ -72,7 +72,7 @@ from ._cosmos_responses import CosmosDict, CosmosList, CosmosItemPaged from ._range_partition_resolver import RangePartitionResolver from ._read_items_helper import ReadItemsHelperSync -from ._request_object import RequestObject +from ._request_object import RequestObject, METADATA_CACHE_POPULATION_OPTION from ._retry_utility import ConnectionRetryPolicy from ._routing import routing_map_provider from ._query_advisor import get_query_advice_info @@ -2983,6 +2983,7 @@ def Read( headers, options.get("partitionKey", None)) request_params.set_excluded_location_from_options(options) + request_params.set_metadata_cache_population_from_options(options) request_params.set_availability_strategy(options, self.availability_strategy) request_params.availability_strategy_executor = self.availability_strategy_executor base.set_session_token_header(self, headers, path, request_params, options) @@ -3318,6 +3319,7 @@ def __GetBodiesFromQueryResult(result: dict[str, Any]) -> list[dict[str, Any]]: options.get("partitionKey", None) ) request_params.set_excluded_location_from_options(options) + request_params.set_metadata_cache_population_from_options(options) request_params.set_availability_strategy(options, self.availability_strategy) request_params.availability_strategy_executor = self.availability_strategy_executor @@ -3956,7 +3958,8 @@ def refresh_routing_map_provider( def _refresh_container_properties_cache(self, container_link: str): # If container properties cache is stale, refresh it by reading the container. - container = self.ReadContainer(container_link, options=None) + container = self.ReadContainer( + container_link, options={METADATA_CACHE_POPULATION_OPTION: True}) # Only cache Container Properties that will not change in the lifetime of the container self._set_container_properties_cache(container_link, _build_properties_cache(container, container_link)) @@ -4003,7 +4006,9 @@ def _get_partition_key_definition( partition_key_definition = cached_container.get("partitionKey") # Else read the collection from backend and add it to the cache else: - container = self.ReadContainer(collection_link, options) + read_options = {**options} if options else {} + read_options[METADATA_CACHE_POPULATION_OPTION] = True + container = self.ReadContainer(collection_link, read_options) partition_key_definition = container.get("partitionKey") self._set_container_properties_cache(collection_link, _build_properties_cache(container, collection_link)) return partition_key_definition diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_metadata_hedging.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_metadata_hedging.py index a60063363844..b60daa0a5d6a 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_metadata_hedging.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_metadata_hedging.py @@ -35,7 +35,6 @@ import time from concurrent.futures import CancelledError, Future, ThreadPoolExecutor, as_completed from threading import Event, Semaphore -from types import SimpleNamespace from typing import Any, Callable, Dict, List, Optional, Tuple from azure.core.exceptions import ServiceRequestError, ServiceResponseError # pylint: disable=no-legacy-azure-core-http-response-import @@ -142,9 +141,13 @@ class MetadataCrossRegionHedgingHandler(AvailabilityStrategyHandlerMixin): """ def __init__(self, concurrency_budget: int = DEFAULT_METADATA_HEDGING_CONCURRENCY_BUDGET) -> None: - self._budget = Semaphore(max(1, concurrency_budget)) - # Long-lived executor shared across this client's metadata hedges. - self._executor = ThreadPoolExecutor(max_workers=os.cpu_count()) # pylint: disable=consider-using-with + budget = max(1, concurrency_budget) + self._budget = Semaphore(budget) + # Long-lived executor shared across this client's metadata hedges. Every in-flight + # hedge needs two worker threads (primary + hedge), so size the pool to at least + # 2 * budget; otherwise hedge tasks can starve behind primaries on low-core hosts. + max_workers = max(os.cpu_count() or 1, 2 * budget) + self._executor = ThreadPoolExecutor(max_workers=max_workers) # pylint: disable=consider-using-with def is_acceptable_winner( # pylint: disable=unused-argument self, @@ -192,7 +195,6 @@ def _send_with_delay( location_index: int, available_locations: List[str], complete_status: Event, - first_request_params_holder: SimpleNamespace, ) -> ResponseType: strategy = request_params.availability_strategy if strategy is None: @@ -212,8 +214,6 @@ def _send_with_delay( ) req = copy.deepcopy(request) - if location_index == 0: - first_request_params_holder.request_params = params if complete_status.is_set(): raise CancelledError("The request has been cancelled") @@ -252,14 +252,13 @@ def execute_request( return execute_request_fn(request_params, request) completion_status = Event() - first_request_params_holder: SimpleNamespace = SimpleNamespace(request_params=None) try: primary_future = self._executor.submit( self._send_with_delay, request_params, request, execute_request_fn, - 0, available_locations, completion_status, first_request_params_holder) + 0, available_locations, completion_status) hedge_future = self._executor.submit( self._send_with_delay, request_params, request, execute_request_fn, - 1, available_locations, completion_status, first_request_params_holder) + 1, available_locations, completion_status) futures: List[Future] = [primary_future, hedge_future] for completed_future in as_completed(futures): @@ -269,10 +268,11 @@ def execute_request( if self.is_acceptable_winner(result, exception, is_hedge=not is_primary): completion_status.set() - if not is_primary: - # Hedge won; record a failure for the primary region so health - # tracking can react to the slow/failed primary. - self._record_cancel_for_first_request(first_request_params_holder, global_endpoint_manager) + # Note: no per-region failure is recorded for the primary when the + # hedge wins. Metadata cache reads are account-global, not + # per-partition, so the circuit-breaker health tracker (keyed on + # partition-key ranges) does not apply here; a slow primary is not + # necessarily an unhealthy one. if exception is not None: raise exception return result # type: ignore[return-value] @@ -288,14 +288,6 @@ def execute_request( completion_status.set() self._budget.release() - def _record_cancel_for_first_request( - self, - request_params_holder: SimpleNamespace, - global_endpoint_manager: Any, - ) -> None: - if request_params_holder.request_params is not None: - global_endpoint_manager.record_failure(request_params_holder.request_params) - def execute_metadata_hedging( handler: MetadataCrossRegionHedgingHandler, diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_request_object.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_request_object.py index b27b5486cdcd..63470aa9ec88 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_request_object.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_request_object.py @@ -31,6 +31,11 @@ from .documents import _OperationType from .http_constants import ResourceType +# Internal options key used to mark a request as a cold-start metadata-cache +# population read (container / partition-key-range cache). Only set by the +# cache-population callsites; gates cold-start metadata hedging. +METADATA_CACHE_POPULATION_OPTION = "metadataCachePopulation" + class RequestObject(object): # pylint: disable=too-many-instance-attributes def __init__( @@ -59,6 +64,7 @@ def __init__( self.pk_val = pk_val self.retry_write: int = 0 self.is_hedging_request: bool = False # Flag to track if this is a hedged request + self.is_metadata_cache_population: bool = False # Cold-start metadata-cache read self.completion_status: Optional[Union[threading.Event, asyncio.Event]] = None def route_to_location_with_preferred_location_flag( # pylint: disable=name-too-long @@ -97,6 +103,17 @@ def set_excluded_location_from_options(self, options: Mapping[str, Any]) -> None if self._can_set_excluded_location(options): self.excluded_locations = options['excludedLocations'] + def set_metadata_cache_population_from_options(self, options: Mapping[str, Any]) -> None: # pylint: disable=name-too-long + """Mark this request as a cold-start metadata-cache population read when the + internal ``metadataCachePopulation`` options flag is set. + + :param options: The request options that may contain the internal flag. + :type options: Mapping[str, Any] + :return: None + """ + if options and options.get(METADATA_CACHE_POPULATION_OPTION): + self.is_metadata_cache_population = True + def set_retry_write(self, request_options: Mapping[str, Any], client_retry_write: int) -> None: if self.resource_type == ResourceType.Document: if request_options and request_options.get(Constants.Kwargs.RETRY_WRITE): @@ -149,7 +166,7 @@ def set_availability_strategy( def should_cancel_request(self) -> bool: """Check if this request should be cancelled due to parallel request completion. - + :return: True if request should be cancelled, False otherwise :rtype: bool """ diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/aio/routing_map_provider.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/aio/routing_map_provider.py index 668adbae90d2..5c4e6cd04d28 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/aio/routing_map_provider.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/aio/routing_map_provider.py @@ -28,6 +28,7 @@ from typing import Dict, Any, Optional, List, TYPE_CHECKING from azure.core.utils import CaseInsensitiveDict from ... import _base, http_constants +from ..._request_object import METADATA_CACHE_POPULATION_OPTION from ..collection_routing_map import CollectionRoutingMap from ...exceptions import CosmosHttpResponseError from .._routing_map_provider_common import ( @@ -406,6 +407,10 @@ async def _fetch_routing_map( change_feed_options = prepare_fetch_options_and_headers( current_previous_map, feed_options, base_kwargs_for_headers ) + # Mark these reads as cold-start metadata-cache population so the request + # layer can hedge them cross-region; other PartitionKeyRange readers + # (e.g. hybrid search) are not flagged and so are not hedged. + change_feed_options[METADATA_CACHE_POPULATION_OPTION] = True base_headers: Dict[str, Any] = base_kwargs_for_headers['headers'] while True: diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/routing_map_provider.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/routing_map_provider.py index c2cca7bb2ec1..977c0c92f407 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/routing_map_provider.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/routing_map_provider.py @@ -28,6 +28,7 @@ from typing import Dict, Any, Optional, List, TYPE_CHECKING from azure.core.utils import CaseInsensitiveDict from .. import _base, http_constants +from .._request_object import METADATA_CACHE_POPULATION_OPTION from .collection_routing_map import CollectionRoutingMap from ..exceptions import CosmosHttpResponseError from ._routing_map_provider_common import ( @@ -373,6 +374,10 @@ def _fetch_routing_map( change_feed_options = prepare_fetch_options_and_headers( current_previous_map, feed_options, base_kwargs_for_headers ) + # Mark these reads as cold-start metadata-cache population so the request + # layer can hedge them cross-region; other PartitionKeyRange readers + # (e.g. hybrid search) are not flagged and so are not hedged. + change_feed_options[METADATA_CACHE_POPULATION_OPTION] = True base_headers: Dict[str, Any] = base_kwargs_for_headers['headers'] while True: diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_synchronized_request.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_synchronized_request.py index e26b34744cef..da0a6b02ad62 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_synchronized_request.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_synchronized_request.py @@ -264,6 +264,8 @@ def _is_metadata_hedging_applicable(client, request_params: RequestObject, globa return False if request_params.is_hedging_request or request_params.availability_strategy is not None: return False + if not request_params.is_metadata_cache_population: + return False if not is_supported_metadata_request(request_params): return False opt_in = getattr(client, "_metadata_hedging_opt_in", None) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_asynchronous_request.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_asynchronous_request.py index 016e6e8510f6..49151aae9a6f 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_asynchronous_request.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_asynchronous_request.py @@ -235,6 +235,8 @@ def _is_metadata_hedging_applicable(client, request_params: RequestObject, globa return False if request_params.is_hedging_request or request_params.availability_strategy is not None: return False + if not request_params.is_metadata_cache_population: + return False if not is_supported_metadata_request(request_params): return False opt_in = getattr(client, "_metadata_hedging_opt_in", None) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py index 92f4cc16bff3..b95e0b9bf433 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py @@ -1312,6 +1312,7 @@ async def Read( headers, options.get("partitionKey", None)) request_params.set_excluded_location_from_options(options) + request_params.set_metadata_cache_population_from_options(options) await base.set_session_token_header_async(self, headers, path, request_params, options) request_params.set_availability_strategy(options, self.availability_strategy) request_params.availability_strategy_max_concurrency = self.availability_strategy_max_concurrency @@ -3116,6 +3117,7 @@ def __GetBodiesFromQueryResult(result: dict[str, Any]) -> list[dict[str, Any]]: options.get("partitionKey", None), ) request_params.set_excluded_location_from_options(options) + request_params.set_metadata_cache_population_from_options(options) request_params.set_availability_strategy(options, self.availability_strategy) request_params.availability_strategy_max_concurrency = self.availability_strategy_max_concurrency headers = base.GetHeaders(self, initial_headers, "get", path, id_, resource_type, @@ -3819,7 +3821,8 @@ async def refresh_routing_map_provider( async def _refresh_container_properties_cache(self, container_link: str): # If container properties cache is stale, refresh it by reading the container. - container = await self.ReadContainer(container_link, options=None) + container = await self.ReadContainer( + container_link, options={_request_object.METADATA_CACHE_POPULATION_OPTION: True}) # Only cache Container Properties that will not change in the lifetime of the container self._set_container_properties_cache(container_link, _build_properties_cache(container, container_link)) @@ -3916,7 +3919,9 @@ async def _get_partition_key_definition( partition_key_definition = cached_container.get("partitionKey") # Else read the collection from backend and add it to the cache else: - container = await self.ReadContainer(collection_link, options) + read_options = {**options} if options else {} + read_options[_request_object.METADATA_CACHE_POPULATION_OPTION] = True + container = await self.ReadContainer(collection_link, read_options) partition_key_definition = container.get("partitionKey") self._set_container_properties_cache(collection_link, _build_properties_cache(container, collection_link)) return partition_key_definition diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_metadata_hedging.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_metadata_hedging.py index 7a9d6a1297f7..1bb3a23c1c5a 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_metadata_hedging.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_metadata_hedging.py @@ -30,7 +30,6 @@ import copy import logging from asyncio import CancelledError, Event, Task -from types import SimpleNamespace from typing import Any, Awaitable, Callable, Dict, List, Optional, Tuple from azure.core.pipeline.transport import HttpRequest # pylint: disable=no-legacy-azure-core-http-response-import @@ -106,7 +105,6 @@ async def _send_with_delay( location_index: int, available_locations: List[str], complete_status: Event, - first_request_params_holder: SimpleNamespace, ) -> ResponseType: strategy = request_params.availability_strategy if strategy is None: @@ -126,8 +124,6 @@ async def _send_with_delay( ) req = copy.deepcopy(request) - if location_index == 0: - first_request_params_holder.request_params = params if complete_status.is_set(): raise CancelledError("The request has been cancelled") @@ -160,21 +156,24 @@ async def execute_request( logger.debug("Metadata hedge skipped: SingleRegion") return await execute_request_fn(request_params, request) + # Non-blocking budget check: locked() is True only when the budget is fully + # exhausted (value == 0). There is no await between this check and acquire(), + # so in single-threaded asyncio no other coroutine can consume the slot in + # between and acquire() completes immediately without blocking. if self._budget.locked(): logger.debug("Metadata hedge skipped: BudgetExhausted") return await execute_request_fn(request_params, request) await self._budget.acquire() completion_status = Event() - first_request_params_holder: SimpleNamespace = SimpleNamespace(request_params=None) active_tasks: List[Task] = [] try: primary_task = asyncio.create_task(self._send_with_delay( request_params, request, execute_request_fn, - 0, available_locations, completion_status, first_request_params_holder)) + 0, available_locations, completion_status)) hedge_task = asyncio.create_task(self._send_with_delay( request_params, request, execute_request_fn, - 1, available_locations, completion_status, first_request_params_holder)) + 1, available_locations, completion_status)) active_tasks = [primary_task, hedge_task] pending = set(active_tasks) @@ -187,9 +186,10 @@ async def execute_request( if self.is_acceptable_winner(result, exception, is_hedge=not is_primary): completion_status.set() - if not is_primary: - await self._record_cancel_for_first_request( - first_request_params_holder, global_endpoint_manager) + # Note: no per-region failure is recorded for the primary when the + # hedge wins. Metadata cache reads are account-global, not + # per-partition, so the circuit-breaker health tracker (keyed on + # partition-key ranges) does not apply here. if exception is not None: raise exception return result # type: ignore[return-value] @@ -208,14 +208,6 @@ async def execute_request( await asyncio.gather(*active_tasks, return_exceptions=True) self._budget.release() - async def _record_cancel_for_first_request( - self, - request_params_holder: SimpleNamespace, - global_endpoint_manager: Any, - ) -> None: - if request_params_holder.request_params is not None: - await global_endpoint_manager.record_failure(request_params_holder.request_params) - async def execute_metadata_hedging( handler: MetadataCrossRegionAsyncHedgingHandler, diff --git a/sdk/cosmos/azure-cosmos/tests/test_metadata_hedging.py b/sdk/cosmos/azure-cosmos/tests/test_metadata_hedging.py index 172ef8f5135d..bec6b6d49790 100644 --- a/sdk/cosmos/azure-cosmos/tests/test_metadata_hedging.py +++ b/sdk/cosmos/azure-cosmos/tests/test_metadata_hedging.py @@ -97,6 +97,19 @@ def test_is_supported_metadata_request(self): self.assertFalse(is_supported_metadata_request( RequestObject(ResourceType.Collection, _OperationType.Create, {}))) + def test_set_metadata_cache_population_from_options(self): + from azure.cosmos._request_object import METADATA_CACHE_POPULATION_OPTION + req = RequestObject(ResourceType.Collection, _OperationType.Read, {}) + self.assertFalse(req.is_metadata_cache_population) + req.set_metadata_cache_population_from_options({METADATA_CACHE_POPULATION_OPTION: True}) + self.assertTrue(req.is_metadata_cache_population) + # Absent / falsey flag leaves it unset. + req2 = RequestObject(ResourceType.Collection, _OperationType.Read, {}) + req2.set_metadata_cache_population_from_options({}) + self.assertFalse(req2.is_metadata_cache_population) + req2.set_metadata_cache_population_from_options(None) + self.assertFalse(req2.is_metadata_cache_population) + class TestMetadataHedgingHandler(unittest.TestCase): def setUp(self): @@ -129,8 +142,6 @@ def execute_fn(params, _req): result, _ = self.handler.execute_request( _metadata_request(), self.gem, _http_request(), execute_fn) self.assertEqual(result["source"], "hedge") - # Primary failure should be recorded when the hedge wins. - self.assertEqual(len(self.gem.recorded_failures), 1) def test_hedge_auth_reject_not_accepted(self): def execute_fn(params, _req): @@ -224,14 +235,24 @@ def setUp(self): self._is_applicable = _is_metadata_hedging_applicable self.handler = MetadataCrossRegionHedgingHandler() - def _request(self, resource_type=ResourceType.Collection, operation=_OperationType.Read): - return RequestObject(resource_type, operation, {}) + def _request(self, resource_type=ResourceType.Collection, operation=_OperationType.Read, + cache_population=True): + req = RequestObject(resource_type, operation, {}) + req.is_metadata_cache_population = cache_population + return req def test_applicable_when_opt_in_true(self): client = _FakeClient(self.handler, True) gem = _FakeGlobalEndpointManager(["r1", "r2"], ppaf=False) self.assertTrue(self._is_applicable(client, self._request(), gem)) + def test_not_applicable_without_cache_population_flag(self): + client = _FakeClient(self.handler, True) + gem = _FakeGlobalEndpointManager(["r1", "r2"], ppaf=True) + # A supported metadata read that is NOT a cache-population read (e.g. a public + # container.read()) must not be hedged. + self.assertFalse(self._is_applicable(client, self._request(cache_population=False), gem)) + def test_follows_ppaf_when_opt_in_none(self): client = _FakeClient(self.handler, None) self.assertTrue(self._is_applicable( diff --git a/sdk/cosmos/azure-cosmos/tests/test_metadata_hedging_async.py b/sdk/cosmos/azure-cosmos/tests/test_metadata_hedging_async.py index 098824b91064..4c9b8cd0c81b 100644 --- a/sdk/cosmos/azure-cosmos/tests/test_metadata_hedging_async.py +++ b/sdk/cosmos/azure-cosmos/tests/test_metadata_hedging_async.py @@ -91,7 +91,6 @@ async def execute_fn(params, _req): result, _ = await self.handler.execute_request( _metadata_request(), self.gem, _http_request(), execute_fn) self.assertEqual(result["source"], "hedge") - self.assertEqual(len(self.gem.recorded_failures), 1) asyncio.run(run())