diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index 872e6a9ee37c..039c62deb91c 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -10,6 +10,7 @@ #### Bugs Fixed * Fixed bug where `CosmosClient` construction with AAD credentials would crash at startup if the semantic reranking inference endpoint environment variable was not set, even when semantic reranking was not being used. The inference service is now lazily initialized on first use. See [PR 46243](https://github.com/Azure/azure-sdk-for-python/pull/46243) * Fixed bug where region names in `preferred_locations` and `excluded_locations` (client-level and per-request) were not matched tolerantly for differences in case, whitespace, hyphens, and underscores. See [PR 46937](https://github.com/Azure/azure-sdk-for-python/pull/46937) +* Fixed bug where a `ValueError("Ranges overlap")` or an `AssertionError("code bug: returned overlapping ranges ... is empty")` from the partition key range cache could escape to the caller when the `/pkranges` response contained a transiently inconsistent snapshot (overlap or gap). See [PR 47091](https://github.com/Azure/azure-sdk-for-python/pull/47091) #### Other Changes * Reduced per-client memory overhead when partition-level circuit breaker (PPCB) is enabled by sharing the partition key range routing map cache across CosmosClient instances connected to the same endpoint, and stripping unused fields from cached partition key ranges using compact PKRange namedtuples. See [PR 46297](https://github.com/Azure/azure-sdk-for-python/pull/46297) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/_routing_map_provider_common.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/_routing_map_provider_common.py index ce579fdb258a..232d1c8fc288 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/_routing_map_provider_common.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/_routing_map_provider_common.py @@ -28,10 +28,22 @@ """ import logging +import random from typing import Any, Dict, List, Optional, Tuple from .. import _base, http_constants -from .collection_routing_map import CollectionRoutingMap, _build_routing_map_from_ranges +from ..exceptions import CosmosHttpResponseError +# ``_OverlapDetected`` and ``_GapDetected`` are imported (but not referenced +# inside this module) so that provider modules and tests can import them from +# this single module instead of reaching into ``collection_routing_map`` +# directly. Pylint reports ``unused-import`` on the ``from`` line as a whole +# (not on the individual names), so the disable must live on that line. +from .collection_routing_map import ( # pylint: disable=unused-import + CollectionRoutingMap, + _build_routing_map_from_ranges, + _OverlapDetected, + _GapDetected, +) from . import routing_range from .routing_range import ( PKRange, @@ -44,6 +56,98 @@ PAGE_SIZE_CHANGE_FEED = "-1" # Return all available changes +# Maximum retry attempts for transient full-load /pkranges inconsistencies +# (overlap OR gap) before surfacing a transient HTTP 503. Centralised here so +# the sync and async providers share one source of truth. +_TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS = 3 +# Initial backoff between inconsistency retries; doubles each attempt. With +# ``_TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS = 3``, attempt 3 raises 503 before +# sleeping, so only the post-attempt-1 and post-attempt-2 backoffs are slept -- +# deterministic worst case is the sum of the first two schedule entries +# (``_TRANSIENT_SNAPSHOT_RETRY_INITIAL_BACKOFF_SECONDS`` and that value +# doubled); expected wall time is half of that under full jitter. +_TRANSIENT_SNAPSHOT_RETRY_INITIAL_BACKOFF_SECONDS = 0.5 + + +def _jittered_backoff(backoff_seconds: float) -> float: + """Return a uniformly-jittered backoff in the range ``[0, backoff_seconds]``. + + Implements the "full jitter" strategy so concurrent retriers in different + processes do not retry in lockstep against the same gateway node. + + :param float backoff_seconds: Deterministic upper bound for the backoff, + in seconds. Must be non-negative. + :return: A uniformly-distributed sleep value in ``[0, backoff_seconds]``. + :rtype: float + """ + return random.uniform(0, backoff_seconds) + + +def _handle_transient_snapshot_retry_decision( + *, + retry_attempt_count: int, + collection_link: str, + logger: logging.Logger, # pylint: disable=redefined-outer-name +) -> float: + """Decide what to do after the full-load builder reported a transient + snapshot inconsistency (overlap or gap). + + Returns a jittered backoff for the caller to sleep before the next + attempt; raises :class:`CosmosHttpResponseError` (HTTP 503) once + ``_TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS`` is reached. Under the default + budget the final attempt raises 503 before sleeping, so only the first + ``_TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS - 1`` attempts produce a sleep + (deterministic upper bounds doubling from + ``_TRANSIENT_SNAPSHOT_RETRY_INITIAL_BACKOFF_SECONDS``). The caller + performs the actual sleep (``time.sleep`` vs ``await asyncio.sleep``), + which is the only line that differs between the sync and async + providers. + + :keyword int retry_attempt_count: Attempts so far, including the one + that just failed. Pass ``1`` after the first failure. + :keyword str collection_link: Used in log messages and the 503 body. + :keyword logging.Logger logger: Caller's module-level logger. + :return: Jittered backoff seconds in ``[0, deterministic_upper_bound]``. + :rtype: float + :raises CosmosHttpResponseError: When the attempt budget is exhausted. + """ + if retry_attempt_count >= _TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS: + logger.error( + "Full-load routing-map fetch for collection '%s' detected a " + "transient snapshot inconsistency (overlap or gap) on every " + "one of %d attempt(s). Surfacing as transient HTTP 503 so the " + "caller's retry policy can take over.", + collection_link, + retry_attempt_count, + ) + raise CosmosHttpResponseError( + status_code=http_constants.StatusCodes.SERVICE_UNAVAILABLE, + message=( + "Failed to build routing map for collection '{}': transient " + "snapshot inconsistency (overlap or gap) persisted across {} " + "full-load attempt(s). Surfaced as a retryable transient " + "error so the upstream retry policy can take over." + ).format(collection_link, retry_attempt_count), + ) + + deterministic_backoff = ( + _TRANSIENT_SNAPSHOT_RETRY_INITIAL_BACKOFF_SECONDS * (2 ** (retry_attempt_count - 1)) + ) + jittered_backoff = _jittered_backoff(deterministic_backoff) + logger.warning( + "Full-load routing-map fetch for collection '%s' detected a transient " + "snapshot inconsistency (overlap or gap) (attempt %d/%d). Sleeping " + "%.2fs (jittered from upper bound %.2fs) and retrying.", + collection_link, + retry_attempt_count, + _TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS, + jittered_backoff, + deterministic_backoff, + ) + return jittered_backoff + + + def is_cache_unchanged_since_previous( collection_routing_map_by_item: Dict[str, CollectionRoutingMap], @@ -149,7 +253,7 @@ def _resolve_endpoint(client: Any) -> str: class _IncrementalMergeFailed(Exception): - """Sentinel raised by :func:`process_fetched_ranges` when the + """Private exception type raised by :func:`process_fetched_ranges` when the incremental update cannot resolve all partition key ranges. The caller decides how to recover: retry the incremental fetch @@ -162,7 +266,7 @@ def process_fetched_ranges( collection_id: str, collection_link: str, new_etag: Optional[str], -) -> Optional[CollectionRoutingMap]: +) -> CollectionRoutingMap: """Turn raw PK-range results into a :class:`CollectionRoutingMap`. Handles both initial-load (when *previous_routing_map* is ``None``) @@ -177,10 +281,8 @@ def process_fetched_ranges( :param str collection_id: The ID of the collection. :param str collection_link: The link to the collection. :param str new_etag: The ETag from the change feed response, or ``None``. - :return: The new/updated routing map, or ``None`` when an - initial load yields no ranges. + :return: The new/updated routing map. :rtype: ~azure.cosmos._routing.collection_routing_map.CollectionRoutingMap - or None :raises _IncrementalMergeFailed: When the incremental path cannot resolve all ranges. The caller catches this and either retries the incremental fetch or falls back to a full refresh. @@ -257,7 +359,25 @@ def process_fetched_ranges( unresolved = next_unresolved - result = previous_routing_map.try_combine(range_tuples, effective_etag) + try: + result = previous_routing_map.try_combine(range_tuples, effective_etag) + except ValueError as overlap_error: + # Convert the overlap ``ValueError`` from ``try_combine`` into + # ``_IncrementalMergeFailed`` so the caller retries the incremental + # fetch and ultimately falls back to the full-load path (which has + # its own ``_OverlapDetected`` retry + 503 safety net). Narrow to + # the literal ``"Ranges overlap"`` prefix (kept stable in + # ``is_complete_set_of_range`` and pinned by the regression tests) + # so any future unrelated ``ValueError`` surfaces as a real bug. + if not str(overlap_error).startswith("Ranges overlap"): + raise + logger.warning( + "Incremental merge for collection '%s' produced overlapping ranges: %s. " + "Converting to _IncrementalMergeFailed so the caller retries / " + "falls back to a full refresh.", + collection_link, str(overlap_error), + ) + raise _IncrementalMergeFailed() from overlap_error if not result: logger.warning( "Incremental merge resulted in incomplete routing map for " diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/aio/routing_map_provider.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/aio/routing_map_provider.py index 4cfb429ab7e3..1604b4734b48 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/aio/routing_map_provider.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/aio/routing_map_provider.py @@ -38,6 +38,9 @@ determine_refresh_action, get_smart_overlapping_ranges, _IncrementalMergeFailed, + _OverlapDetected, + _GapDetected, + _handle_transient_snapshot_retry_decision, ) @@ -103,6 +106,8 @@ # Number of extra incremental attempts after an incomplete incremental merge # before falling back to a full routing-map refresh. _INCOMPLETE_ROUTING_MAP_MAX_RETRIES = 1 + + class PartitionKeyRangeCache(object): """ PartitionKeyRangeCache provides list of effective partition key ranges for a @@ -307,9 +312,12 @@ async def get_routing_map( **kwargs ) - # Update the cache. - if new_routing_map: - self._collection_routing_map_by_item[collection_id] = new_routing_map + # ``_fetch_routing_map`` always returns a populated + # ``CollectionRoutingMap`` on success and raises otherwise -- + # No defensive None-check needed; one + # would only mask a future regression by silently leaving + # the cache empty instead of surfacing the failure. + self._collection_routing_map_by_item[collection_id] = new_routing_map return self._collection_routing_map_by_item.get(collection_id) @@ -321,7 +329,7 @@ async def _fetch_routing_map( previous_routing_map: Optional[CollectionRoutingMap], feed_options: Optional[Dict[str, Any]], **kwargs - ) -> Optional[CollectionRoutingMap]: + ) -> CollectionRoutingMap: """Fetches or updates the routing map using an incremental change feed. This method handles both the initial loading of a collection's routing @@ -331,18 +339,30 @@ async def _fetch_routing_map( of inconsistencies during an incremental update, it automatically falls back to a full refresh. + Always returns a populated :class:`CollectionRoutingMap` on success. + Failure modes raise an exception rather than returning ``None``: + ``CosmosHttpResponseError`` for the underlying network call (including + the transient HTTP 503 raised once the snapshot-inconsistency retry + budget is exhausted), or the internal ``_IncrementalMergeFailed`` + signal when the incremental-merge path cannot make progress and there + is no previous map to fall back on. + :param str collection_link: The link to the collection. :param str collection_id: The ID of the collection. :param previous_routing_map: The last known routing map for incremental updates. :type previous_routing_map: azure.cosmos.routing.collection_routing_map.CollectionRoutingMap or None :param feed_options: Options for the change feed request. :type feed_options: dict or None - :return: The updated or newly created CollectionRoutingMap, or None if the update fails. - :rtype: azure.cosmos.routing.collection_routing_map.CollectionRoutingMap or None - :raises CosmosHttpResponseError: If the underlying request to fetch ranges fails. + :return: The updated or newly created CollectionRoutingMap. + :rtype: azure.cosmos.routing.collection_routing_map.CollectionRoutingMap + :raises CosmosHttpResponseError: If the underlying ``/pkranges`` fetch + fails, or if every snapshot-inconsistency retry exhausts the + budget (surfaced as HTTP 503 so the upstream retry policy can + take over). """ current_previous_map = previous_routing_map incomplete_attempt_count = 0 + inconsistency_attempt_count = 0 while True: request_kwargs = dict(kwargs) @@ -398,6 +418,22 @@ async def _fetch_routing_map( continue raise + except (_OverlapDetected, _GapDetected): + # Reset ``current_previous_map`` to ``None`` so the next + # iteration runs the full-load path: we do not want to keep + # retrying an incremental fetch against the same inconsistent + # base. ``_handle_transient_snapshot_retry_decision`` returns + # the backoff or raises a 503 once the attempt budget is + # exhausted. + inconsistency_attempt_count += 1 + backoff = _handle_transient_snapshot_retry_decision( + retry_attempt_count=inconsistency_attempt_count, + collection_link=collection_link, + logger=logger, + ) + await asyncio.sleep(backoff) + current_previous_map = None + continue async def get_range_by_partition_key_range_id( self, diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/collection_routing_map.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/collection_routing_map.py index ba719f955a72..5d5bd1983106 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/collection_routing_map.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/collection_routing_map.py @@ -29,6 +29,34 @@ from azure.cosmos._routing import routing_range from azure.cosmos._routing.routing_range import PartitionKeyRange, PKRange + +class _OverlapDetected(Exception): + """Raised by :func:`_build_routing_map_from_ranges` to signal that the + gateway returned a ``/pkranges`` snapshot with overlapping ranges. + + Intentionally NOT a ``ValueError`` subclass: cache-layer code historically + catches ``ValueError`` broadly, so a plain ``ValueError`` would be + silently swallowed and surface later as an empty result from + ``get_overlapping_ranges``. Each provider's ``_fetch_routing_map`` + catches this type explicitly and applies the retry-on-overlap policy. + """ + + +class _GapDetected(Exception): + """Raised by :func:`_build_routing_map_from_ranges` to signal that the + gateway returned a ``/pkranges`` snapshot with a hole in the key space + (i.e. one range's upper bound is strictly less than the next range's + lower bound, leaving some keys uncovered). + + Same root cause as :class:`_OverlapDetected` -- a transient gateway + snapshot served mid-propagation -- but the opposite symptom. Today + this case escapes as ``AssertionError("code bug: returned overlapping + ranges ... is empty")`` from ``SmartRoutingMapProvider``'s generator + when the downstream query tries to use the empty result. Caught + alongside ``_OverlapDetected`` and given the same bounded retry + + typed HTTP 503 treatment so the upstream retry policy can absorb it. + """ + # pylint: disable=line-too-long class CollectionRoutingMap(object): """Stores partition key ranges in an efficient way with some additional @@ -196,7 +224,23 @@ def is_complete_set_of_range(ordered_partition_key_range_list): if not isComplete: if previousRange[PartitionKeyRange.MaxExclusive] > currentRange[PartitionKeyRange.MinInclusive]: - raise ValueError("Ranges overlap") + # Include the offending pair in the message so whoever + # investigates the next occurrence has actionable + # diagnostics without having to reproduce the failure + # under a debugger. Keep the literal substring + # "Ranges overlap" for backwards compatibility with + # any caller that pattern-matches on it. + raise ValueError( + "Ranges overlap: previous range id={!r} ({!r} -> {!r}) " + "overlaps current range id={!r} ({!r} -> {!r})".format( + previousRange.get(PartitionKeyRange.Id), + previousRange[PartitionKeyRange.MinInclusive], + previousRange[PartitionKeyRange.MaxExclusive], + currentRange.get(PartitionKeyRange.Id), + currentRange[PartitionKeyRange.MinInclusive], + currentRange[PartitionKeyRange.MaxExclusive], + ) + ) break return isComplete @@ -265,12 +309,14 @@ def _build_routing_map_from_ranges( new_etag, collection_link: str, _logger -) -> Optional['CollectionRoutingMap']: +) -> 'CollectionRoutingMap': """Build a complete routing map from a full load of partition key ranges. Filters out parent (gone) ranges and validates that the remaining ranges - form a complete, gap-free partition key space. Returns None if the ranges - are incomplete. + form a complete, gap-free partition key space. Raises ``_OverlapDetected`` + when the ranges overlap and ``_GapDetected`` when they have a gap; both + are transient gateway-snapshot inconsistencies the caller is expected to + retry. This is shared between the sync and async PartitionKeyRangeCache to avoid code duplication — the logic is purely synchronous. @@ -280,9 +326,27 @@ def _build_routing_map_from_ranges( :param str new_etag: The ETag from the change feed response. :param str collection_link: The collection link, used for log messages. :param logging.Logger _logger: Logger instance for error reporting. - :return: A complete CollectionRoutingMap, or None if the ranges are incomplete. - :rtype: Optional[CollectionRoutingMap] + :return: A complete CollectionRoutingMap. + :rtype: CollectionRoutingMap + :raises _OverlapDetected: If the ranges contain an overlap that could not + be resolved from this single snapshot. The caller should retry the + fetch; see :class:`_OverlapDetected` for the rationale. + :raises _GapDetected: If the ranges have a hole in the key space. The + caller should retry the fetch; see :class:`_GapDetected` for the + rationale. """ + # Dedup the input by id before validation. Paginated ``/pkranges`` + # responses can repeat the same range id across pages when consecutive + # pages are served from gateway nodes with slightly different cached + # views; without dedup the duplicate trips the overlap check on two + # identical entries. Last-write-wins is safe in practice; if duplicates + # carry asymmetric metadata, the overlap path is handled by the + # ``_OverlapDetected`` retry flow. + deduped_by_id: dict = {} + for r in ranges: + deduped_by_id[r[PartitionKeyRange.Id]] = r + ranges = list(deduped_by_id.values()) + gone_range_ids = set() for r in ranges: if PartitionKeyRange.Parents in r and r[PartitionKeyRange.Parents]: @@ -294,19 +358,45 @@ def _build_routing_map_from_ranges( ] range_tuples = [(r, True) for r in filtered_ranges] - routing_map = CollectionRoutingMap.CompleteRoutingMap( - range_tuples, - collection_id, - new_etag - ) + try: + routing_map = CollectionRoutingMap.CompleteRoutingMap( + range_tuples, + collection_id, + new_etag + ) + except ValueError as overlap_error: + # Convert the overlap ``ValueError`` raised by ``is_complete_set_of_range`` + # into ``_OverlapDetected`` so the caller can apply the retry-on-overlap + # policy. Narrow to the literal ``"Ranges overlap"`` prefix (kept stable + # in ``is_complete_set_of_range`` and pinned by the regression tests) + # so any future unrelated ``ValueError`` from ``CompleteRoutingMap`` + # surfaces as a real bug instead of being silently coerced into a retry. + if not str(overlap_error).startswith("Ranges overlap"): + raise + _logger.warning( + "Full load of routing map for collection '%s' detected overlapping " + "partition key ranges: %s. Signalling caller to retry the " + "/pkranges fetch.", + collection_link, str(overlap_error), + ) + raise _OverlapDetected() from overlap_error if not routing_map: - _logger.error( - "Full load of routing map for collection '%s' failed: " - "the service returned an incomplete set of partition key ranges. " - "This can happen due to a transient service issue or a split completing mid-fetch.", - collection_link + # ``CompleteRoutingMap`` returns None when ``is_complete_set_of_range`` + # returns False without raising -- the gap case (``prev.max < cur.min``) + # or an empty input list. Same root cause as the overlap raise above + # (transient inconsistent gateway snapshot), opposite symptom: a hole + # in the key space rather than a duplicated one. Raise ``_GapDetected`` + # so the caller applies the same bounded retry + 503-on-exhaustion + # treatment as overlap. Without this, today's behaviour is for the + # ``None`` to surface as ``AssertionError("code bug: returned + # overlapping ranges ... is empty")`` from ``SmartRoutingMapProvider``. + _logger.warning( + "Full load of routing map for collection '%s' returned an " + "incomplete set of partition key ranges (gap in key space). " + "Signalling caller to retry the /pkranges fetch.", + collection_link, ) - return None + raise _GapDetected() return routing_map diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/routing_map_provider.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/routing_map_provider.py index 92abe54cba94..2772552ce8f5 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/routing_map_provider.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/routing_map_provider.py @@ -23,11 +23,16 @@ Cosmos database service. """ import threading +import time import logging from typing import Dict, Any, Optional, List, TYPE_CHECKING from azure.core.utils import CaseInsensitiveDict from .. import _base, http_constants -from .collection_routing_map import CollectionRoutingMap +from .collection_routing_map import ( + CollectionRoutingMap, + _OverlapDetected, + _GapDetected, +) from ..exceptions import CosmosHttpResponseError from ._routing_map_provider_common import ( _resolve_endpoint, @@ -37,6 +42,7 @@ determine_refresh_action, get_smart_overlapping_ranges, _IncrementalMergeFailed, + _handle_transient_snapshot_retry_decision, ) if TYPE_CHECKING: @@ -93,6 +99,8 @@ # Number of extra incremental attempts after an incomplete incremental merge # before falling back to a full routing-map refresh. _INCOMPLETE_ROUTING_MAP_MAX_RETRIES = 1 + + class PartitionKeyRangeCache(object): """ PartitionKeyRangeCache provides list of effective partition key ranges for a @@ -277,9 +285,12 @@ def get_routing_map( feed_options, **kwargs ) - - if new_routing_map: - self._collection_routing_map_by_item[collection_id] = new_routing_map + # ``_fetch_routing_map`` always returns a populated + # ``CollectionRoutingMap`` on success and raises otherwise -- + # No defensive None-check needed; one + # would only mask a future regression by silently leaving + # the cache empty instead of surfacing the failure. + self._collection_routing_map_by_item[collection_id] = new_routing_map return self._collection_routing_map_by_item.get(collection_id) @@ -292,7 +303,7 @@ def _fetch_routing_map( previous_routing_map: Optional[CollectionRoutingMap], feed_options: Optional[Dict[str, Any]], **kwargs - ) -> Optional[CollectionRoutingMap]: + ) -> CollectionRoutingMap: """Fetches or updates the routing map using an incremental change feed. @@ -303,17 +314,30 @@ def _fetch_routing_map( of inconsistencies during an incremental update, it automatically falls back to a full refresh. + Always returns a populated :class:`CollectionRoutingMap` on success. + Failure modes raise an exception rather than returning ``None``: + ``CosmosHttpResponseError`` for the underlying network call (including + the transient HTTP 503 raised once the snapshot-inconsistency retry + budget is exhausted), or the internal ``_IncrementalMergeFailed`` + signal when the incremental-merge path cannot make progress and there + is no previous map to fall back on. + :param str collection_link: The link to the collection. :param str collection_id: The unique identifier of the collection. :param previous_routing_map: The routing map to be updated. If None, a full load is performed. :type previous_routing_map: azure.cosmos.routing.collection_routing_map.CollectionRoutingMap :param feed_options: Options for the change feed request. :type feed_options: dict or None - :return: The new or updated CollectionRoutingMap, or None if retrieval fails. - :rtype: azure.cosmos.routing.collection_routing_map.CollectionRoutingMap or None + :return: The new or updated CollectionRoutingMap. + :rtype: azure.cosmos.routing.collection_routing_map.CollectionRoutingMap + :raises CosmosHttpResponseError: If the underlying ``/pkranges`` fetch + fails, or if every snapshot-inconsistency retry exhausts the + budget (surfaced as HTTP 503 so the upstream retry policy can + take over). """ current_previous_map = previous_routing_map incomplete_attempt_count = 0 + inconsistency_attempt_count = 0 while True: request_kwargs = dict(kwargs) @@ -368,6 +392,22 @@ def _fetch_routing_map( continue raise + except (_OverlapDetected, _GapDetected): + # Reset ``current_previous_map`` to ``None`` so the next + # iteration runs the full-load path: we do not want to keep + # retrying an incremental fetch against the same inconsistent + # base. ``_handle_transient_snapshot_retry_decision`` returns + # the backoff or raises a 503 once the attempt budget is + # exhausted. + inconsistency_attempt_count += 1 + backoff = _handle_transient_snapshot_retry_decision( + retry_attempt_count=inconsistency_attempt_count, + collection_link=collection_link, + logger=logger, + ) + time.sleep(backoff) + current_previous_map = None + continue def get_overlapping_ranges(self, collection_link, partition_key_ranges, feed_options, **kwargs): """Given a partition key range and a collection, return the list of diff --git a/sdk/cosmos/azure-cosmos/cspell.json b/sdk/cosmos/azure-cosmos/cspell.json index d71f63bc08b7..62b0df0f579f 100644 --- a/sdk/cosmos/azure-cosmos/cspell.json +++ b/sdk/cosmos/azure-cosmos/cspell.json @@ -2,7 +2,11 @@ "ignoreWords": [ "hdrh", "hdrhistogram", + "dedup", + "deduped", + "deduping", "dedupe", + "dedups", "perfdb", "perfresults", "pkrange", diff --git a/sdk/cosmos/azure-cosmos/tests/routing/test_collection_routing_map.py b/sdk/cosmos/azure-cosmos/tests/routing/test_collection_routing_map.py index d4f261ee29ae..cdf86550b6a8 100644 --- a/sdk/cosmos/azure-cosmos/tests/routing/test_collection_routing_map.py +++ b/sdk/cosmos/azure-cosmos/tests/routing/test_collection_routing_map.py @@ -7,7 +7,12 @@ import pytest import azure.cosmos._routing.routing_range as routing_range -from azure.cosmos._routing.collection_routing_map import CollectionRoutingMap, _build_routing_map_from_ranges +from azure.cosmos._routing.collection_routing_map import ( + CollectionRoutingMap, + _build_routing_map_from_ranges, + _OverlapDetected, + _GapDetected, +) @pytest.mark.cosmosEmulator @@ -412,16 +417,23 @@ def test_build_routing_map_no_parents_passes_through_all(self): ids = [r['id'] for r in result._orderedPartitionKeyRanges] self.assertEqual(ids, ['0', '1']) - def test_build_routing_map_returns_none_for_incomplete_ranges(self): - """_build_routing_map_from_ranges returns None when the filtered ranges - don't form a complete partition key space (gap exists).""" + def test_build_routing_map_raises_gap_detected_for_incomplete_ranges(self): + """_build_routing_map_from_ranges raises ``_GapDetected`` when the + filtered ranges don't form a complete partition key space (gap exists). + The caller catches this and applies the same bounded-retry-then-503 + policy as ``_OverlapDetected``; without this raise the downstream + ``SmartRoutingMapProvider`` would crash with ``AssertionError("code + bug: returned overlapping ranges ... is empty")`` when the resulting + empty range list flows into its generator.""" _logger = logging.getLogger("test") ranges = [ {'id': '0', 'minInclusive': '', 'maxExclusive': '80'}, # Gap from '80' to 'FF' — incomplete ] - result = _build_routing_map_from_ranges(ranges, 'coll1', '"etag-4"', 'dbs/db/colls/coll1', _logger) - self.assertIsNone(result, "Should return None for incomplete range coverage") + with self.assertRaises(_GapDetected): + _build_routing_map_from_ranges( + ranges, 'coll1', '"etag-4"', 'dbs/db/colls/coll1', _logger + ) def test_build_routing_map_empty_parents_list_not_treated_as_gone(self): """_build_routing_map_from_ranges does NOT filter a range whose 'parents' @@ -449,6 +461,168 @@ def test_build_routing_map_stores_etag(self): result_none_etag = _build_routing_map_from_ranges(ranges, 'coll1', None, 'link', _logger) self.assertIsNone(result_none_etag.change_feed_etag) + # ========================================================================== + # Regression tests for transient /pkranges snapshot inconsistencies. + # The builder must either succeed (after deduping duplicates by id) or + # raise ``_OverlapDetected`` — never let a bare ``ValueError`` reach + # ``get_overlapping_ranges``, which would silently return an empty list. + # ========================================================================== + + def test_full_load_dedups_duplicate_range_id_across_pages_mode_1(self): + """Duplicate range across pages: when /pkranges pagination + returns the same range id on two consecutive pages because two gateway + nodes serve from slightly different cached snapshots, the SDK should + dedup by id (last-write-wins) before validating and produce a valid + routing map — not raise.""" + _logger = logging.getLogger("test") + # Range id '1' appears twice because page boundary fell between two + # gateway nodes with one-tick-out-of-sync caches. + ranges = [ + {'id': '0', 'minInclusive': '', 'maxExclusive': '40'}, + {'id': '1', 'minInclusive': '40', 'maxExclusive': '80'}, + {'id': '1', 'minInclusive': '40', 'maxExclusive': '80'}, # duplicate from next page + {'id': '2', 'minInclusive': '80', 'maxExclusive': 'C0'}, + {'id': '3', 'minInclusive': 'C0', 'maxExclusive': 'FF'}, + ] + result = _build_routing_map_from_ranges(ranges, 'coll1', '"etag-dup"', 'dbs/db/colls/coll1', _logger) + + self.assertIsNotNone( + result, + "Duplicate range id across pages should be deduped, not crash the builder." + ) + ids = [r['id'] for r in result._orderedPartitionKeyRanges] + self.assertEqual(ids, ['0', '1', '2', '3']) + + def test_full_load_raises_overlap_sentinel_for_stale_parent_with_missing_child_refs_mode_2(self): + """Stale parent with children missing parent reference: when a + gateway node returns a freshly-split parent alongside its children but + the children's 'parents' fields fail to reference the parent (because + the lineage metadata hadn't fully propagated when that node served the + page), _build_routing_map_from_ranges should convert the underlying + ValueError into _OverlapDetected so the caller can retry — and the + exception must NOT be a plain ValueError, since a plain ValueError + would escape to the cache layer and silently return empty results + from get_overlapping_ranges.""" + _logger = logging.getLogger("test") + # Parent '10' was split into '10/0' and '10/1', but the children on + # this page lost their 'parents': ['10'] reference. The one-direction + # parent filter cannot remove '10' because no surviving range names it. + ranges = [ + {'id': 'L', 'minInclusive': '', 'maxExclusive': '80'}, # unaffected left neighbor + {'id': '10', 'minInclusive': '80', 'maxExclusive': 'A0'}, # stale parent + {'id': '10/0', 'minInclusive': '80', 'maxExclusive': '90'}, # SHOULD have parents=['10'] + {'id': '10/1', 'minInclusive': '90', 'maxExclusive': 'A0'}, # SHOULD have parents=['10'] + {'id': 'R', 'minInclusive': 'A0', 'maxExclusive': 'FF'}, # unaffected right neighbor + ] + with self.assertRaises(_OverlapDetected): + _build_routing_map_from_ranges(ranges, 'coll1', '"etag-stale-parent"', 'dbs/db/colls/coll1', _logger) + + def test_full_load_raises_overlap_sentinel_for_grandparent_surviving_cascade_split_mode_3(self): + """Grandparent surviving a cascade split: when two generations + of splits have completed and the intermediate parent drops its + reference to the grandparent, the grandparent survives every available + defense and overlaps its grandchildren. _build_routing_map_from_ranges + should convert this into _OverlapDetected so the caller can retry.""" + _logger = logging.getLogger("test") + # '10' split into '10/0' and '10/1'; then '10/0' split into '10/0/0' + # and '10/0/1'. The grandchildren reference '10/0' correctly, but + # '10/0' and '10/1' both lost their 'parents': ['10'] reference, so + # the parent filter only collects {'10/0'} and leaves '10' in place. + ranges = [ + {'id': 'L', 'minInclusive': '', 'maxExclusive': '80'}, + {'id': '10', 'minInclusive': '80', 'maxExclusive': 'A0'}, # grandparent + {'id': '10/0', 'minInclusive': '80', 'maxExclusive': '90'}, # SHOULD have parents=['10'] + {'id': '10/0/0', 'minInclusive': '80', 'maxExclusive': '88', 'parents': ['10/0']}, + {'id': '10/0/1', 'minInclusive': '88', 'maxExclusive': '90', 'parents': ['10/0']}, + {'id': '10/1', 'minInclusive': '90', 'maxExclusive': 'A0'}, # SHOULD have parents=['10'] + {'id': 'R', 'minInclusive': 'A0', 'maxExclusive': 'FF'}, + ] + with self.assertRaises(_OverlapDetected): + _build_routing_map_from_ranges(ranges, 'coll1', '"etag-cascade"', 'dbs/db/colls/coll1', _logger) + + def test_full_load_raises_gap_detected_for_hole_in_key_space(self): + """Mirror of the overlap scenarios for the gap side: when one gateway + node has propagated the deletion of a parent but the children have + not yet appeared in its view, the response is missing the range that + used to cover the deleted parent's span. The builder must convert + this into ``_GapDetected`` so the caller applies the same bounded + retry. Without this, the empty ``get_overlapping_ranges`` result + crashes ``SmartRoutingMapProvider`` with ``AssertionError("code + bug: ...")``.""" + _logger = logging.getLogger("test") + # Parent '10' covered "80" -> "A0" and was just deleted; its + # children 10/0 and 10/1 have not yet propagated to this node's view. + ranges = [ + {'id': 'L', 'minInclusive': '', 'maxExclusive': '80'}, + {'id': 'R', 'minInclusive': 'A0', 'maxExclusive': 'FF'}, + # Hole "80" -> "A0" is unclaimed. + ] + with self.assertRaises(_GapDetected): + _build_routing_map_from_ranges(ranges, 'coll1', '"etag-gap"', 'dbs/db/colls/coll1', _logger) + + def test_gap_detected_is_not_a_value_error(self): + """Same invariant as for ``_OverlapDetected``: ``_GapDetected`` must + be distinguishable by type and must not be silently absorbed by any + ``except ValueError`` catch in the cache layer.""" + self.assertFalse( + issubclass(_GapDetected, ValueError), + "_GapDetected must not inherit from ValueError -- that would " + "allow legacy ValueError catches to absorb the retry signal." + ) + self.assertTrue(issubclass(_GapDetected, Exception)) + + def test_overlap_sentinel_is_not_a_value_error(self): + """``_OverlapDetected`` must not be a subclass of ``ValueError`` (or any + other exception type that callers in the cache layer have historically + caught and swallowed). If it were, the very catch sites that today let + the bug crash the scan would silently absorb the signal and convert + it into the same empty-result correctness bug we were trying to avoid. + + The exception must be plainly an ``Exception``, distinguishable by + type, so the dedicated retry loop in ``_fetch_routing_map`` is the + only handler.""" + self.assertFalse( + issubclass(_OverlapDetected, ValueError), + "_OverlapDetected must not inherit from ValueError -- that would " + "allow legacy ValueError catches to absorb the retry signal." + ) + # Should still be a concrete Exception subclass (sanity check). + self.assertTrue(issubclass(_OverlapDetected, Exception)) + + def test_overlap_error_message_identifies_offending_ranges(self): + """When is_complete_set_of_range is called directly with genuinely + overlapping input (i.e. an unrecoverable programmer error rather than + a transient gateway snapshot), the ValueError message should identify + which two ranges overlapped so that whoever investigates the next + occurrence has actionable diagnostics out of the box. + + Also pins the literal ``"Ranges overlap"`` prefix on the message. + The full-load guard in ``_build_routing_map_from_ranges`` and the + incremental-merge guard in ``process_fetched_ranges`` both rely on + ``str(err).startswith("Ranges overlap")`` to distinguish the + snapshot-inconsistency case from any unrelated future ``ValueError``. + If this prefix ever changes, those guards will silently start + re-raising what they were meant to convert, so the prefix is part + of the contract and gets asserted here.""" + ranges = [ + {'id': 'A', 'minInclusive': '', 'maxExclusive': '80'}, + {'id': 'B', 'minInclusive': '40', 'maxExclusive': 'FF'}, # overlaps with A + ] + with self.assertRaises(ValueError) as ctx: + CollectionRoutingMap.is_complete_set_of_range(ranges) + + msg = str(ctx.exception) + self.assertTrue( + msg.startswith("Ranges overlap"), + "Message must start with the literal 'Ranges overlap' prefix that " + "the production guards in _build_routing_map_from_ranges and " + "process_fetched_ranges match against. Got: {!r}".format(msg), + ) + self.assertIn('A', msg, "Error message should name the previous (offending) range id.") + self.assertIn('B', msg, "Error message should name the current (offending) range id.") + self.assertIn('80', msg, "Error message should include the previous range's maxExclusive.") + self.assertIn('40', msg, "Error message should include the current range's minInclusive.") + if __name__ == '__main__': unittest.main() diff --git a/sdk/cosmos/azure-cosmos/tests/routing/test_routing_map_provider.py b/sdk/cosmos/azure-cosmos/tests/routing/test_routing_map_provider.py index 56f6637ff454..153588350305 100644 --- a/sdk/cosmos/azure-cosmos/tests/routing/test_routing_map_provider.py +++ b/sdk/cosmos/azure-cosmos/tests/routing/test_routing_map_provider.py @@ -12,8 +12,9 @@ from azure.cosmos import http_constants from typing import Optional, Mapping, Any -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch import threading +from azure.cosmos.exceptions import CosmosHttpResponseError @pytest.mark.cosmosEmulator class TestRoutingMapProvider(unittest.TestCase): @@ -336,14 +337,18 @@ def test_is_cache_stale_etag_logic(self): mock_map2.change_feed_etag = cached_map.change_feed_etag self.assertFalse(provider._is_cache_stale(collection_id, mock_map2)) - def test_fetch_routing_map_full_load_with_incomplete_ranges_returns_none(self): - """When a full load (previous_routing_map=None) returns gapped ranges, returns None immediately.""" + def test_fetch_routing_map_full_load_with_incomplete_ranges_surfaces_503(self): + """When a full load (previous_routing_map=None) repeatedly returns + gapped ranges, the retry budget should be exhausted and the provider + should surface a retryable HTTP 503.""" incomplete_ranges = [ {'id': '0', 'minInclusive': '', 'maxExclusive': '80'} # Gap from 80 to FF ] + call_count = {'count': 0} class IncompleteClient: def _ReadPartitionKeyRanges(self, _collection_link, feed_options=None, **kwargs): + call_count['count'] += 1 TestRoutingMapProvider._capture_internal_headers(kwargs, '"incomplete-etag"') return incomplete_ranges @@ -352,13 +357,16 @@ def _ReadPartitionKeyRanges(self, _collection_link, feed_options=None, **kwargs) collection_link = "dbs/db/colls/container" collection_id = _base.GetResourceIdOrFullNameFromLink(collection_link) - result = provider._fetch_routing_map( - collection_link=collection_link, - collection_id=collection_id, - previous_routing_map=None, - feed_options={}, - ) - self.assertIsNone(result, "Should return None when full load produces incomplete ranges") + with patch('azure.cosmos._routing.routing_map_provider.time.sleep', return_value=None): + with self.assertRaises(CosmosHttpResponseError) as ctx: + provider._fetch_routing_map( + collection_link=collection_link, + collection_id=collection_id, + previous_routing_map=None, + feed_options={}, + ) + self.assertEqual(ctx.exception.status_code, http_constants.StatusCodes.SERVICE_UNAVAILABLE) + self.assertEqual(call_count['count'], 3) def test_fetch_routing_map_incremental_with_parents(self): """Incremental update correctly merges child ranges that reference a parent.""" diff --git a/sdk/cosmos/azure-cosmos/tests/routing/test_routing_map_provider_async.py b/sdk/cosmos/azure-cosmos/tests/routing/test_routing_map_provider_async.py index 5d7408bb6216..983aa12313e5 100644 --- a/sdk/cosmos/azure-cosmos/tests/routing/test_routing_map_provider_async.py +++ b/sdk/cosmos/azure-cosmos/tests/routing/test_routing_map_provider_async.py @@ -12,7 +12,8 @@ from azure.cosmos import http_constants from typing import Optional, Mapping, Any -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch +from azure.cosmos.exceptions import CosmosHttpResponseError @pytest.mark.cosmosEmulator @@ -316,14 +317,18 @@ async def test_is_cache_stale_etag_logic_async(self): mock_map2.change_feed_etag = cached_map.change_feed_etag self.assertFalse(provider._is_cache_stale(collection_id, mock_map2)) - async def test_fetch_routing_map_full_load_with_incomplete_ranges_returns_none_async(self): - """When a full load (previous_routing_map=None) returns gapped ranges, returns None immediately.""" + async def test_fetch_routing_map_full_load_with_incomplete_ranges_surfaces_503_async(self): + """When a full load (previous_routing_map=None) repeatedly returns + gapped ranges, the retry budget should be exhausted and the provider + should surface a retryable HTTP 503.""" incomplete_ranges = [ {'id': '0', 'minInclusive': '', 'maxExclusive': '80'} # Gap from 80 to FF ] + call_count = {'count': 0} class IncompleteClient: def _ReadPartitionKeyRanges(self, _collection_link, feed_options=None, **kwargs): + call_count['count'] += 1 TestRoutingMapProviderAsync._capture_internal_headers(kwargs, '"incomplete-etag"') async def _gen(): @@ -337,13 +342,19 @@ async def _gen(): collection_link = "dbs/db/colls/container" collection_id = _base.GetResourceIdOrFullNameFromLink(collection_link) - result = await provider._fetch_routing_map( - collection_link=collection_link, - collection_id=collection_id, - previous_routing_map=None, - feed_options={}, - ) - self.assertIsNone(result, "Should return None when full load produces incomplete ranges") + async def _no_sleep(_seconds): + return None + + with patch('azure.cosmos._routing.aio.routing_map_provider.asyncio.sleep', new=_no_sleep): + with self.assertRaises(CosmosHttpResponseError) as ctx: + await provider._fetch_routing_map( + collection_link=collection_link, + collection_id=collection_id, + previous_routing_map=None, + feed_options={}, + ) + self.assertEqual(ctx.exception.status_code, http_constants.StatusCodes.SERVICE_UNAVAILABLE) + self.assertEqual(call_count['count'], 3) async def test_fetch_routing_map_incremental_with_parents_async(self): """Incremental update correctly merges child ranges that reference a parent.""" diff --git a/sdk/cosmos/azure-cosmos/tests/test_container_rid_header_unit.py b/sdk/cosmos/azure-cosmos/tests/test_container_rid_header_unit.py index 8ea0bda6dbe3..04214d6c3f9a 100644 --- a/sdk/cosmos/azure-cosmos/tests/test_container_rid_header_unit.py +++ b/sdk/cosmos/azure-cosmos/tests/test_container_rid_header_unit.py @@ -10,6 +10,7 @@ import unittest from typing import Optional, Mapping, Any +from unittest.mock import patch from azure.cosmos._routing import routing_range from azure.cosmos._routing.routing_map_provider import ( PartitionKeyRangeCache, @@ -17,6 +18,7 @@ ) from azure.cosmos._routing.collection_routing_map import CollectionRoutingMap from azure.cosmos import _base, http_constants +from azure.cosmos.exceptions import CosmosHttpResponseError # ===================================================================== @@ -333,11 +335,10 @@ def _ReadPartitionKeyRanges(self, _collection_link, feed_options=None, **kwargs) "returning a delta instead of the complete set of ranges" ) - def test_full_load_with_incomplete_ranges_returns_none(self): - """When a full load (no previous routing map) returns ranges with gaps, - CompleteRoutingMap returns None. The method must return None immediately - without retrying — there is no incremental state to fall back from, and - repeating the identical request would produce the same result.""" + def test_full_load_with_incomplete_ranges_surfaces_503(self): + """When a full load (no previous routing map) repeatedly returns gapped + ranges, the retry budget should be exhausted and _fetch_routing_map + should surface a retryable HTTP 503.""" class IncompleteRangesClient: """Returns ranges with a gap — CompleteRoutingMap will return None.""" @@ -354,16 +355,15 @@ def _ReadPartitionKeyRanges(self, _collection_link, feed_options=None, **kwargs) client = IncompleteRangesClient() cache = PartitionKeyRangeCache(client) - result = cache._fetch_routing_map( - COLLECTION_LINK, - _base.GetResourceIdOrFullNameFromLink(COLLECTION_LINK), - None, # full load (no previous map) - {}, - ) - assert result is None, ( - "Full load with incomplete ranges must return None " - "instead of retrying infinitely" - ) + with patch('azure.cosmos._routing.routing_map_provider.time.sleep', return_value=None): + with self.assertRaises(CosmosHttpResponseError) as ctx: + cache._fetch_routing_map( + COLLECTION_LINK, + _base.GetResourceIdOrFullNameFromLink(COLLECTION_LINK), + None, # full load (no previous map) + {}, + ) + self.assertEqual(ctx.exception.status_code, http_constants.StatusCodes.SERVICE_UNAVAILABLE) def test_incremental_fallback_to_full_load_succeeds(self): """When an incremental (change-feed) update fails because a returned diff --git a/sdk/cosmos/azure-cosmos/tests/test_container_rid_header_unit_async.py b/sdk/cosmos/azure-cosmos/tests/test_container_rid_header_unit_async.py index 85b90785fbfc..853545d64a62 100644 --- a/sdk/cosmos/azure-cosmos/tests/test_container_rid_header_unit_async.py +++ b/sdk/cosmos/azure-cosmos/tests/test_container_rid_header_unit_async.py @@ -10,6 +10,7 @@ import unittest from typing import Optional, Mapping, Any, Dict +from unittest.mock import patch from azure.cosmos._routing import routing_range from azure.cosmos._routing.aio.routing_map_provider import ( PartitionKeyRangeCache, @@ -17,6 +18,7 @@ ) from azure.cosmos._routing.collection_routing_map import CollectionRoutingMap from azure.cosmos import _base, http_constants +from azure.cosmos.exceptions import CosmosHttpResponseError # ===================================================================== @@ -293,11 +295,10 @@ async def _ReadPartitionKeyRanges(self, collection_link, feed_options=None, **kw "returning a delta instead of the complete set of ranges" ) - async def test_full_load_with_incomplete_ranges_returns_none_async(self): - """When a full load (no previous routing map) returns ranges with gaps, - CompleteRoutingMap returns None. The method must return None immediately - without retrying — there is no incremental state to fall back from, and - repeating the identical request would produce the same result.""" + async def test_full_load_with_incomplete_ranges_surfaces_503_async(self): + """When a full load (no previous routing map) repeatedly returns gapped + ranges, the retry budget should be exhausted and _fetch_routing_map + should surface a retryable HTTP 503.""" class IncompleteRangesClient: async def _ReadPartitionKeyRanges(self, collection_link, feed_options=None, **kwargs): @@ -314,16 +315,18 @@ async def _ReadPartitionKeyRanges(self, collection_link, feed_options=None, **kw client = IncompleteRangesClient() cache = PartitionKeyRangeCache(client) - result = await cache._fetch_routing_map( - COLLECTION_LINK, - _base.GetResourceIdOrFullNameFromLink(COLLECTION_LINK), - None, # full load (no previous map) - {}, - ) - assert result is None, ( - "Full load with incomplete ranges must return None " - "instead of retrying infinitely" - ) + async def _no_sleep(_seconds): + return None + + with patch('azure.cosmos._routing.aio.routing_map_provider.asyncio.sleep', new=_no_sleep): + with self.assertRaises(CosmosHttpResponseError) as ctx: + await cache._fetch_routing_map( + COLLECTION_LINK, + _base.GetResourceIdOrFullNameFromLink(COLLECTION_LINK), + None, # full load (no previous map) + {}, + ) + self.assertEqual(ctx.exception.status_code, http_constants.StatusCodes.SERVICE_UNAVAILABLE) async def test_incremental_fallback_to_full_load_succeeds_async(self): """When an incremental (change-feed) update fails because a returned diff --git a/sdk/cosmos/azure-cosmos/tests/test_partition_split_query.py b/sdk/cosmos/azure-cosmos/tests/test_partition_split_query.py index 73e0a21085cb..8985a72825f2 100644 --- a/sdk/cosmos/azure-cosmos/tests/test_partition_split_query.py +++ b/sdk/cosmos/azure-cosmos/tests/test_partition_split_query.py @@ -608,9 +608,8 @@ def test_full_refresh_fallback_stops_infinite_recursion(self): the service returns an incomplete set of partition ranges. When a full load is performed (previous_routing_map=None) and the service - returns gapped ranges, _fetch_routing_map must return None immediately - - there is no incremental state to fall back from, and repeating the - identical request would produce the same result.""" + returns gapped ranges, _fetch_routing_map should surface a retryable + HTTP 503 after exhausting the bounded retry budget.""" container_id = 'test_fallback_guard_' + str(uuid.uuid4()) self.key_database.create_container( id=container_id, @@ -644,19 +643,17 @@ def mock_read_ranges(*args, **kwargs): '_ReadPartitionKeyRanges', side_effect=mock_read_ranges ): - # Full load with incomplete ranges should return None immediately - result = provider._fetch_routing_map( - collection_link=collection_link, - collection_id=collection_id, - previous_routing_map=None, - feed_options={}, - ) - - # Should return None instead of recursing infinitely - assert result is None, \ - "_fetch_routing_map should return None when full load produces incomplete ranges" - - print("Validated: full load with incomplete ranges returns None without recursion") + with patch('azure.cosmos._routing.routing_map_provider.time.sleep', return_value=None): + with self.assertRaises(CosmosHttpResponseError) as ctx: + provider._fetch_routing_map( + collection_link=collection_link, + collection_id=collection_id, + previous_routing_map=None, + feed_options={}, + ) + self.assertEqual(ctx.exception.status_code, http_constants.StatusCodes.SERVICE_UNAVAILABLE) + + print("Validated: full load with incomplete ranges surfaces retryable HTTP 503") finally: self.key_database.delete_container(container_id) diff --git a/sdk/cosmos/azure-cosmos/tests/test_partition_split_query_async.py b/sdk/cosmos/azure-cosmos/tests/test_partition_split_query_async.py index 990d57195de1..fb701f1c613a 100644 --- a/sdk/cosmos/azure-cosmos/tests/test_partition_split_query_async.py +++ b/sdk/cosmos/azure-cosmos/tests/test_partition_split_query_async.py @@ -14,6 +14,7 @@ from azure.cosmos import http_constants from azure.cosmos import _base from azure.cosmos.aio import CosmosClient, DatabaseProxy, ContainerProxy +from azure.cosmos.exceptions import CosmosHttpResponseError async def run_queries(container, iterations): ret_list = [] @@ -592,12 +593,13 @@ async def test_is_cache_stale_etag_comparison_async(self): finally: await self.key_database.delete_container(container_id) - async def test_full_load_with_incomplete_ranges_returns_none_async(self): + async def test_full_load_with_incomplete_ranges_surfaces_503_async(self): """ - Validates that a full load with incomplete ranges returns None immediately. + Validates that a full load with incomplete ranges surfaces a retryable + HTTP 503 after exhausting the bounded retry budget. When a full load is performed (previous_routing_map=None) and the service - returns gapped ranges, _fetch_routing_map should return None without retrying - - there is no incremental state to fall back from. + returns gapped ranges, _fetch_routing_map should not leak internal + map-construction failures to callers. """ container_id = 'test_fallback_guard_async_' + str(uuid.uuid4()) await self.key_database.create_container( @@ -632,19 +634,20 @@ async def mock_read_ranges(*args, **kwargs): '_ReadPartitionKeyRanges', side_effect=mock_read_ranges ): - # Full load with incomplete ranges should return None immediately - result = await provider._fetch_routing_map( - collection_link=collection_link, - collection_id=collection_id, - previous_routing_map=None, - feed_options={}, - ) - - # Should return None instead of recursing infinitely - assert result is None, \ - "_fetch_routing_map should return None when full load produces incomplete ranges" - - print("Validated: full load with incomplete ranges returns None without recursion") + async def _no_sleep(_seconds): + return None + + with patch('azure.cosmos._routing.aio.routing_map_provider.asyncio.sleep', new=_no_sleep): + with self.assertRaises(CosmosHttpResponseError) as ctx: + await provider._fetch_routing_map( + collection_link=collection_link, + collection_id=collection_id, + previous_routing_map=None, + feed_options={}, + ) + self.assertEqual(ctx.exception.status_code, http_constants.StatusCodes.SERVICE_UNAVAILABLE) + + print("Validated: full load with incomplete ranges surfaces retryable HTTP 503") finally: await self.key_database.delete_container(container_id) diff --git a/sdk/cosmos/azure-cosmos/tests/test_routing_map_provider_unit.py b/sdk/cosmos/azure-cosmos/tests/test_routing_map_provider_unit.py index e6b203ae12e8..b95ffd99d296 100644 --- a/sdk/cosmos/azure-cosmos/tests/test_routing_map_provider_unit.py +++ b/sdk/cosmos/azure-cosmos/tests/test_routing_map_provider_unit.py @@ -7,16 +7,26 @@ - Empty change feed response (304 Not Modified / zero ranges from incremental update) """ +import logging import threading import time import unittest -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch import pytest -from azure.cosmos._routing.routing_map_provider import PartitionKeyRangeCache +from azure.cosmos._routing._routing_map_provider_common import ( + _handle_transient_snapshot_retry_decision, + _TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS, + process_fetched_ranges, + _IncrementalMergeFailed, +) +from azure.cosmos._routing.routing_map_provider import ( + PartitionKeyRangeCache, +) from azure.cosmos._routing.collection_routing_map import CollectionRoutingMap from azure.cosmos import http_constants +from azure.cosmos.exceptions import CosmosHttpResponseError from azure.cosmos._gone_retry_policy_base import _PartitionKeyRangeGoneRetryPolicyBase @@ -287,9 +297,13 @@ def read_pk_ranges_empty(collection_link, options, response_hook=None, **kwargs) self.assertEqual(ids, ['0'], "Ranges should be preserved from previous map") self.assertEqual(result.change_feed_etag, '"etag-new"', "ETag should be updated") - def test_fetch_routing_map_empty_full_load_returns_none(self): - """_fetch_routing_map should return None when a full load (no previous - map) returns zero ranges — this means the service returned nothing.""" + def test_fetch_routing_map_empty_full_load_raises_503_after_budget(self): + """When a full load (no previous map) repeatedly returns zero ranges, + ``_fetch_routing_map`` should retry up to the overlap budget and then + surface ``CosmosHttpResponseError(status_code=503)`` rather than + returning ``None``. Empty ranges hit the same ``_GapDetected`` path as + a gap in the key space; without retry-then-503 the empty result + would later crash ``SmartRoutingMapProvider`` with ``AssertionError``.""" client = MagicMock() def read_pk_ranges_empty(collection_link, options, response_hook=None, **kwargs): @@ -301,14 +315,14 @@ def read_pk_ranges_empty(collection_link, options, response_hook=None, **kwargs) cache = PartitionKeyRangeCache(client) - result = cache._fetch_routing_map( - collection_link="dbs/db1/colls/coll1", - collection_id="dbs/db1/colls/coll1", - previous_routing_map=None, # Full load - feed_options={} - ) - - self.assertIsNone(result, "Full load with empty ranges should return None") + with self.assertRaises(CosmosHttpResponseError) as ctx: + cache._fetch_routing_map( + collection_link="dbs/db1/colls/coll1", + collection_id="dbs/db1/colls/coll1", + previous_routing_map=None, # Full load + feed_options={} + ) + self.assertEqual(ctx.exception.status_code, 503) def test_get_previous_routing_map_exact_key_finds_entry(self): @@ -619,5 +633,453 @@ def read_pk_ranges_cascading(collection_link, options, response_hook=None, **kwa self.assertEqual(result.change_feed_etag, '"etag-old"') + # ========================================================================== + # Helper-level retry-policy unit tests. + # + # These target only the pure helper that computes retry backoff / 503 + # escalation (no cache object, no fetch loop). They pin the contract that + # backoff stays within the deterministic upper bound and that jitter is + # actually applied so concurrent retriers do not retry in lockstep. + # ========================================================================== + + def test_overlap_retry_backoff_is_within_deterministic_upper_bound(self): + """For each non-terminal attempt, the returned backoff must lie inside + ``[0, deterministic_bound]`` where the deterministic bound is the + documented exponential schedule (0.5s, 1.0s, 2.0s). This is what + guarantees we never exceed the advertised worst-case wall time.""" + # Build a fresh logger so we don't compete for handlers with the real one. + test_logger = logging.getLogger(__name__ + ".jitter_bounds_test") + + # _TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS is 3, so non-terminal attempts are 1 and 2. + # (Attempt 3 raises 503; that branch is exercised by the e2e test below.) + for attempt_index, expected_upper_bound in [(1, 0.5), (2, 1.0)]: + for _ in range(50): # 50 draws per attempt -- catches an + # accidentally-constant return value as well. + backoff = _handle_transient_snapshot_retry_decision( + retry_attempt_count=attempt_index, + collection_link="dbs/db1/colls/coll1", + logger=test_logger, + ) + self.assertGreaterEqual( + backoff, 0.0, + "Jittered backoff must be non-negative (random.uniform " + "with low=0 invariant)." + ) + self.assertLessEqual( + backoff, expected_upper_bound, + "Jittered backoff for attempt {} must not exceed the " + "deterministic upper bound {}s; got {}s. This would " + "violate the worst-case wall-time contract documented " + "on _handle_transient_snapshot_retry_decision.".format( + attempt_index, expected_upper_bound, backoff, + ) + ) + + def test_overlap_retry_backoff_actually_varies_between_calls(self): + """Two consecutive calls for the same attempt index must not return + the same value with overwhelming probability -- otherwise jitter has + regressed to a fixed backoff and concurrent retriers in different + Cosmos clients will land back on the same gateway node in lockstep. + + We draw N samples and assert at least two distinct values. The + probability of all-identical draws from ``random.uniform(0, 0.5)`` is + effectively zero in 50 draws, so this is not a flake risk; but if a + future refactor accidentally returns the deterministic backoff, this + test fires loudly.""" + test_logger = logging.getLogger(__name__ + ".jitter_variance_test") + samples = [ + _handle_transient_snapshot_retry_decision( + retry_attempt_count=1, + collection_link="dbs/db1/colls/coll1", + logger=test_logger, + ) + for _ in range(50) + ] + self.assertGreater( + len(set(samples)), 1, + "Overlap-retry backoff produced identical values across 50 draws " + "-- jitter has likely regressed. Each draw should be an " + "independent random.uniform(0, deterministic_bound) sample." + ) + + def test_overlap_retry_raises_503_at_attempt_budget_exhaustion(self): + """At the documented attempt budget (_TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS), + the helper must raise CosmosHttpResponseError(503) -- not return a + backoff. This is the only branch that surfaces an exception to the + caller, and it is what lets the upstream Cosmos retry policy take + over instead of the SDK silently giving up. + + Also exercised end-to-end by + ``test_fetch_routing_map_surfaces_503_after_persistent_overlap``, + but this is the focused unit-level guard.""" + test_logger = logging.getLogger(__name__ + ".jitter_budget_test") + with self.assertRaises(CosmosHttpResponseError) as ctx: + _handle_transient_snapshot_retry_decision( + retry_attempt_count=_TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS, + collection_link="dbs/db1/colls/coll1", + logger=test_logger, + ) + self.assertEqual( + ctx.exception.status_code, + http_constants.StatusCodes.SERVICE_UNAVAILABLE, + ) + + # ========================================================================== + # Provider retry-loop behavior tests (mocked integration path). + # + # These exercise the sync provider's full fetch/retry loop with mocked + # /pkranges payloads. They verify the full path contract: transient + # inconsistencies either recover on retry or surface typed HTTP 503, and + # never leak raw ``ValueError("Ranges overlap")`` to callers. + # ========================================================================== + + def test_fetch_routing_map_recovers_after_transient_overlap(self): + """When the gateway returns an inconsistent paginated /pkranges snapshot + once and a consistent one on retry, the sync cache should populate + cleanly on the second attempt — the customer sees no crash, no missing + rows, just a brief stall.""" + # First call: stale parent + children missing parent reference. + bad_payload = [ + {'id': 'L', 'minInclusive': '', 'maxExclusive': '80'}, + {'id': '10', 'minInclusive': '80', 'maxExclusive': 'A0'}, # stale parent + {'id': '10/0', 'minInclusive': '80', 'maxExclusive': '90'}, # missing parents=['10'] + {'id': '10/1', 'minInclusive': '90', 'maxExclusive': 'A0'}, # missing parents=['10'] + {'id': 'R', 'minInclusive': 'A0', 'maxExclusive': 'FF'}, + ] + # Second call: consistent snapshot, lineage metadata correctly propagated. + good_payload = [ + {'id': 'L', 'minInclusive': '', 'maxExclusive': '80'}, + {'id': '10/0', 'minInclusive': '80', 'maxExclusive': '90', 'parents': ['10']}, + {'id': '10/1', 'minInclusive': '90', 'maxExclusive': 'A0', 'parents': ['10']}, + {'id': 'R', 'minInclusive': 'A0', 'maxExclusive': 'FF'}, + ] + + responses = [bad_payload, good_payload] + call_count = {'n': 0} + + client = MagicMock() + + def fake_read_pk_ranges(collection_link, options, response_hook=None, **kwargs): + payload = responses[call_count['n']] if call_count['n'] < len(responses) else good_payload + call_count['n'] += 1 + headers = {http_constants.HttpHeaders.ETag: '"etag-{}"'.format(call_count['n'])} + if response_hook: + response_hook(headers, None) + capture_headers = kwargs.get('_internal_response_headers_capture') + if capture_headers is not None: + capture_headers.update(headers) + return iter(payload) + + client._ReadPartitionKeyRanges = MagicMock(side_effect=fake_read_pk_ranges) + cache = PartitionKeyRangeCache(client) + + # Patch time.sleep so the test does not actually wait the backoff. + with patch('azure.cosmos._routing.routing_map_provider.time.sleep', return_value=None): + result = cache.get_routing_map("dbs/db1/colls/coll1", feed_options={}) + + self.assertIsNotNone( + result, + "Sync cache should populate after the transient overlap clears on retry." + ) + self.assertEqual( + call_count['n'], 2, + "Expected exactly one retry: one failed fetch + one successful fetch." + ) + ids = [r['id'] for r in result._orderedPartitionKeyRanges] + self.assertEqual(ids, ['L', '10/0', '10/1', 'R']) + + def test_fetch_routing_map_surfaces_503_after_persistent_overlap(self): + """If the gateway keeps returning inconsistent snapshots across every + retry attempt on the sync provider, the cache must NOT silently return + empty results from get_overlapping_ranges (correctness bug). It must + surface a typed transient HTTP error so the upstream retry policy can + decide what to do.""" + bad_payload = [ + {'id': 'L', 'minInclusive': '', 'maxExclusive': '80'}, + {'id': '10', 'minInclusive': '80', 'maxExclusive': 'A0'}, + {'id': '10/0', 'minInclusive': '80', 'maxExclusive': '90'}, + {'id': '10/1', 'minInclusive': '90', 'maxExclusive': 'A0'}, + {'id': 'R', 'minInclusive': 'A0', 'maxExclusive': 'FF'}, + ] + call_count = {'n': 0} + client = MagicMock() + + def fake_read_pk_ranges(collection_link, options, response_hook=None, **kwargs): + call_count['n'] += 1 + headers = {http_constants.HttpHeaders.ETag: '"etag-bad"'} + if response_hook: + response_hook(headers, None) + capture_headers = kwargs.get('_internal_response_headers_capture') + if capture_headers is not None: + capture_headers.update(headers) + return iter(bad_payload) + + client._ReadPartitionKeyRanges = MagicMock(side_effect=fake_read_pk_ranges) + cache = PartitionKeyRangeCache(client) + + with patch('azure.cosmos._routing.routing_map_provider.time.sleep', return_value=None): + with self.assertRaises(CosmosHttpResponseError) as ctx: + cache.get_routing_map("dbs/db1/colls/coll1", feed_options={}) + + self.assertEqual( + ctx.exception.status_code, http_constants.StatusCodes.SERVICE_UNAVAILABLE, + "Persistent overlap must surface as HTTP 503 (transient), not as a bare ValueError " + "or as a silent empty-result return." + ) + self.assertEqual( + call_count['n'], _TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS, + "Should have made exactly _TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS fetch attempts before giving up." + ) + + def test_fetch_routing_map_recovers_after_transient_gap(self): + """Mirror of the overlap-recovery test for the gap side: when the + gateway returns a snapshot with a hole in the key space once and a + consistent one on retry, the sync cache should populate cleanly on + the second attempt rather than letting the empty result reach + ``SmartRoutingMapProvider`` (which would crash with + ``AssertionError``).""" + # First call: gap between "80" and "A0" (parent removed, children not yet visible). + bad_payload = [ + {'id': 'L', 'minInclusive': '', 'maxExclusive': '80'}, + {'id': 'R', 'minInclusive': 'A0', 'maxExclusive': 'FF'}, + ] + # Second call: gap is gone, children have propagated. + good_payload = [ + {'id': 'L', 'minInclusive': '', 'maxExclusive': '80'}, + {'id': '10/0', 'minInclusive': '80', 'maxExclusive': '90'}, + {'id': '10/1', 'minInclusive': '90', 'maxExclusive': 'A0'}, + {'id': 'R', 'minInclusive': 'A0', 'maxExclusive': 'FF'}, + ] + + responses = [bad_payload, good_payload] + call_count = {'n': 0} + + client = MagicMock() + + def fake_read_pk_ranges(collection_link, options, response_hook=None, **kwargs): + payload = responses[call_count['n']] if call_count['n'] < len(responses) else good_payload + call_count['n'] += 1 + headers = {http_constants.HttpHeaders.ETag: '"etag-{}"'.format(call_count['n'])} + if response_hook: + response_hook(headers, None) + capture_headers = kwargs.get('_internal_response_headers_capture') + if capture_headers is not None: + capture_headers.update(headers) + return iter(payload) + + client._ReadPartitionKeyRanges = MagicMock(side_effect=fake_read_pk_ranges) + cache = PartitionKeyRangeCache(client) + + with patch('azure.cosmos._routing.routing_map_provider.time.sleep', return_value=None): + result = cache.get_routing_map("dbs/db1/colls/coll1", feed_options={}) + + self.assertIsNotNone( + result, + "Sync cache should populate after the transient gap clears on retry." + ) + self.assertEqual(call_count['n'], 2, "Expected exactly one retry.") + ids = [r['id'] for r in result._orderedPartitionKeyRanges] + self.assertEqual(ids, ['L', '10/0', '10/1', 'R']) + + def test_fetch_routing_map_surfaces_503_after_persistent_gap(self): + """Mirror of the overlap-503 test for the gap side: a persistent gap + across the retry budget must surface as ``CosmosHttpResponseError(503)`` + rather than as an ``AssertionError("code bug: ...")`` from + ``SmartRoutingMapProvider``.""" + bad_payload = [ + {'id': 'L', 'minInclusive': '', 'maxExclusive': '80'}, + {'id': 'R', 'minInclusive': 'A0', 'maxExclusive': 'FF'}, + ] + call_count = {'n': 0} + client = MagicMock() + + def fake_read_pk_ranges(collection_link, options, response_hook=None, **kwargs): + call_count['n'] += 1 + headers = {http_constants.HttpHeaders.ETag: '"etag-bad"'} + if response_hook: + response_hook(headers, None) + capture_headers = kwargs.get('_internal_response_headers_capture') + if capture_headers is not None: + capture_headers.update(headers) + return iter(bad_payload) + + client._ReadPartitionKeyRanges = MagicMock(side_effect=fake_read_pk_ranges) + cache = PartitionKeyRangeCache(client) + + with patch('azure.cosmos._routing.routing_map_provider.time.sleep', return_value=None): + with self.assertRaises(CosmosHttpResponseError) as ctx: + cache.get_routing_map("dbs/db1/colls/coll1", feed_options={}) + + self.assertEqual(ctx.exception.status_code, http_constants.StatusCodes.SERVICE_UNAVAILABLE) + self.assertEqual( + call_count['n'], _TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS, + "Should have made exactly _TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS fetch attempts before giving up." + ) + + def test_incremental_overlap_converts_to_incremental_merge_failed(self): + """Sync parity with + ``test_incremental_overlap_converts_to_incremental_merge_failed_async``: + + If the incremental-merge path produces overlapping ranges (e.g. the + delta contains a range whose key span overlaps an existing cached + range without either side declaring the other a parent), the + ``ValueError("Ranges overlap")`` raised by ``try_combine`` must NOT + escape to the caller. It must convert to ``_IncrementalMergeFailed`` + so the standard fallback path takes over (retry incremental once, + then full-load -- which has its own ``_OverlapDetected`` handler). + This is what guarantees the customer never observes a bare + ``ValueError`` from any of the validator's call sites.""" + + # Existing cached map: '0' covers ['', '80'] and '1' covers ['80', 'FF']. + previous_map = CollectionRoutingMap.CompleteRoutingMap( + [ + ({'id': '0', 'minInclusive': '', 'maxExclusive': '80'}, True), + ({'id': '1', 'minInclusive': '80', 'maxExclusive': 'FF'}, True), + ], + 'coll1', '"etag-prev"' + ) + + # Delta: + # - '0' re-declared with the same span (resolves via the existing + # ``known_range_info_by_id`` lookup -- no parents needed). + # - '2' with ``parents=['1']`` and a span that overlaps '0'. The + # parent-resolution loop succeeds because '1' is in the cache, + # so we reach ``try_combine``. Once '1' is removed as the gone + # parent, the merged map is { '0' ('', '80'), '2' ('40', 'FF') } + # -- '0' overlaps '2' on ['40', '80'], so ``is_complete_set_of_range`` + # raises ``ValueError("Ranges overlap: ...")`` from inside + # ``try_combine``. + bad_delta = [ + {'id': '0', 'minInclusive': '', 'maxExclusive': '80'}, + {'id': '2', 'minInclusive': '40', 'maxExclusive': 'FF', 'parents': ['1']}, + ] + + # The wrapper around try_combine must absorb the ValueError and convert + # it to _IncrementalMergeFailed for the caller's retry loop. + with self.assertRaises(_IncrementalMergeFailed): + process_fetched_ranges( + bad_delta, previous_map, 'coll1', 'dbs/db1/colls/coll1', '"etag-new"' + ) + + def test_fetch_routing_map_mixed_overlap_and_gap_signals_share_retry_budget(self): + """The transient-snapshot retry budget is a single counter shared by + BOTH ``_OverlapDetected`` and ``_GapDetected`` signals -- it is not + per-signal-type. If the gateway alternates between overlap snapshots + and gap snapshots across attempts, the SDK must still surface a 503 + after the same ``_TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS`` budget. + + Without this guarantee an alternating gateway could starve the + retry policy indefinitely (overlap, gap, overlap, gap, ...), masking + a real partition-routing problem as a slow stall.""" + # Overlap payload: stale parent '10' coexists with its children that + # lack a ``parents`` reference. Triggers ``_OverlapDetected``. + overlap_payload = [ + {'id': 'L', 'minInclusive': '', 'maxExclusive': '80'}, + {'id': '10', 'minInclusive': '80', 'maxExclusive': 'A0'}, + {'id': '10/0', 'minInclusive': '80', 'maxExclusive': '90'}, + {'id': '10/1', 'minInclusive': '90', 'maxExclusive': 'A0'}, + {'id': 'R', 'minInclusive': 'A0', 'maxExclusive': 'FF'}, + ] + # Gap payload: ['80', 'A0') is missing entirely. Triggers ``_GapDetected``. + gap_payload = [ + {'id': 'L', 'minInclusive': '', 'maxExclusive': '80'}, + {'id': 'R', 'minInclusive': 'A0', 'maxExclusive': 'FF'}, + ] + + responses = [overlap_payload, gap_payload, overlap_payload] + call_count = {'n': 0} + + client = MagicMock() + + def fake_read_pk_ranges(collection_link, options, response_hook=None, **kwargs): + payload = responses[call_count['n']] if call_count['n'] < len(responses) else overlap_payload + call_count['n'] += 1 + headers = {http_constants.HttpHeaders.ETag: '"etag-mixed-{}"'.format(call_count['n'])} + if response_hook: + response_hook(headers, None) + capture_headers = kwargs.get('_internal_response_headers_capture') + if capture_headers is not None: + capture_headers.update(headers) + return iter(payload) + + client._ReadPartitionKeyRanges = MagicMock(side_effect=fake_read_pk_ranges) + cache = PartitionKeyRangeCache(client) + + with patch('azure.cosmos._routing.routing_map_provider.time.sleep', return_value=None): + with self.assertRaises(CosmosHttpResponseError) as ctx: + cache.get_routing_map("dbs/db1/colls/coll1", feed_options={}) + + self.assertEqual( + ctx.exception.status_code, http_constants.StatusCodes.SERVICE_UNAVAILABLE, + "Alternating overlap/gap signals must still surface as HTTP 503 once " + "the shared budget is exhausted." + ) + self.assertEqual( + call_count['n'], _TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS, + "Overlap and gap signals must share one retry budget; alternating " + "between them must NOT extend the total number of attempts." + ) + + def test_fetch_routing_map_preserves_existing_cache_entry_when_force_refresh_surfaces_503(self): + """A 503 raised by ``_fetch_routing_map`` during a forced refresh must + NOT corrupt the existing cached routing map. The SDK commonly issues + ``force_refresh=True`` during 410/Gone recovery paths; if that refresh + itself fails transiently we want subsequent reads to keep returning the + previously-cached map (a slightly stale answer is far better than a + cache wiped out by a transient gateway hiccup, which would make + every future query pay the full reload cost).""" + # Pre-populate the shared cache with a known-good routing map. + cached_map = _make_complete_routing_map("dbs/db1/colls/coll1", '"etag-cached"') + cache = PartitionKeyRangeCache(MagicMock()) + cache._collection_routing_map_by_item["dbs/db1/colls/coll1"] = cached_map + + # Wire the client to return an inconsistent (overlap) snapshot every + # time -- forces the retry loop to exhaust its budget and raise 503. + bad_payload = [ + {'id': 'L', 'minInclusive': '', 'maxExclusive': '80'}, + {'id': '10', 'minInclusive': '80', 'maxExclusive': 'A0'}, + {'id': '10/0', 'minInclusive': '80', 'maxExclusive': '90'}, + {'id': '10/1', 'minInclusive': '90', 'maxExclusive': 'A0'}, + {'id': 'R', 'minInclusive': 'A0', 'maxExclusive': 'FF'}, + ] + + def fake_read_pk_ranges(collection_link, options, response_hook=None, **kwargs): + headers = {http_constants.HttpHeaders.ETag: '"etag-bad"'} + if response_hook: + response_hook(headers, None) + capture_headers = kwargs.get('_internal_response_headers_capture') + if capture_headers is not None: + capture_headers.update(headers) + return iter(bad_payload) + + cache._document_client._ReadPartitionKeyRanges = MagicMock(side_effect=fake_read_pk_ranges) + + with patch('azure.cosmos._routing.routing_map_provider.time.sleep', return_value=None): + with self.assertRaises(CosmosHttpResponseError) as ctx: + cache.get_routing_map( + "dbs/db1/colls/coll1", + feed_options={}, + force_refresh=True, + previous_routing_map=cached_map, + ) + + self.assertEqual(ctx.exception.status_code, http_constants.StatusCodes.SERVICE_UNAVAILABLE) + + # Critical invariant: the previously-cached map must still be reachable + # via the same key. A 503 from a forced refresh must never evict good + # cache state -- otherwise every transient gateway blip would force the + # next reader to pay a cold-start cost. + self.assertIs( + cache._collection_routing_map_by_item.get("dbs/db1/colls/coll1"), cached_map, + "Cached routing map must be preserved after a 503 from forced refresh -- " + "transient inconsistencies must not evict good cache state." + ) + self.assertEqual( + cache._collection_routing_map_by_item["dbs/db1/colls/coll1"].change_feed_etag, + '"etag-cached"', + "Cached ETag must remain the pre-503 value (no partial overwrite)." + ) + + if __name__ == "__main__": unittest.main() diff --git a/sdk/cosmos/azure-cosmos/tests/test_routing_map_provider_unit_async.py b/sdk/cosmos/azure-cosmos/tests/test_routing_map_provider_unit_async.py index aa4a06eb9ebc..ff121d3da531 100644 --- a/sdk/cosmos/azure-cosmos/tests/test_routing_map_provider_unit_async.py +++ b/sdk/cosmos/azure-cosmos/tests/test_routing_map_provider_unit_async.py @@ -9,14 +9,21 @@ import asyncio import unittest -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch import pytest -from azure.cosmos.aio import CosmosClient # noqa: F401 - needed to resolve circular imports -from azure.cosmos._routing.aio.routing_map_provider import PartitionKeyRangeCache +from azure.cosmos._routing.aio.routing_map_provider import ( + PartitionKeyRangeCache, +) from azure.cosmos._routing.collection_routing_map import CollectionRoutingMap +from azure.cosmos._routing._routing_map_provider_common import ( + process_fetched_ranges, + _IncrementalMergeFailed, + _TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS, +) from azure.cosmos import http_constants +from azure.cosmos.exceptions import CosmosHttpResponseError from azure.cosmos._gone_retry_policy_base import _PartitionKeyRangeGoneRetryPolicyBase @@ -212,21 +219,25 @@ async def test_fetch_routing_map_empty_incremental_response_async(self): self.assertEqual(ids, ['0'], "Ranges should be preserved from previous map") self.assertEqual(result.change_feed_etag, '"etag-new"', "ETag should be updated") - async def test_fetch_routing_map_empty_full_load_returns_none_async(self): - """_fetch_routing_map should return None when a full load (no previous - map) returns zero ranges — this means the service returned nothing.""" + async def test_fetch_routing_map_empty_full_load_raises_503_after_budget_async(self): + """When a full load (no previous map) repeatedly returns zero ranges, + ``_fetch_routing_map`` should retry up to the overlap budget and then + surface ``CosmosHttpResponseError(status_code=503)`` rather than + returning ``None``. Empty ranges hit the same ``_GapDetected`` path as + a gap in the key space; without retry-then-503 the empty result + would later crash ``SmartRoutingMapProvider`` with ``AssertionError``.""" client = _make_mock_async_client(ranges=[], response_etag='"etag"') cache = PartitionKeyRangeCache(client) - result = await cache._fetch_routing_map( - collection_link="dbs/db1/colls/coll1", - collection_id="dbs/db1/colls/coll1", - previous_routing_map=None, - feed_options={} - ) - - self.assertIsNone(result, "Full load with empty ranges should return None") + with self.assertRaises(CosmosHttpResponseError) as ctx: + await cache._fetch_routing_map( + collection_link="dbs/db1/colls/coll1", + collection_id="dbs/db1/colls/coll1", + previous_routing_map=None, + feed_options={} + ) + self.assertEqual(ctx.exception.status_code, 503) async def test_get_previous_routing_map_exact_key_finds_entry_async(self): @@ -489,6 +500,441 @@ async def async_gen(): self.assertEqual(ids, ['4', '5', '3', '1']) self.assertEqual(result.change_feed_etag, '"etag-old"') + # ========================================================================== + # Provider retry-loop behavior tests (mocked integration path). + # + # These cover integration between builder signaling (overlap/gap) and the + # async provider fetch/retry loop. With mocked /pkranges payloads we verify + # the full path contract: transient inconsistencies either recover on retry + # or surface typed HTTP 503, and never leak raw ``ValueError`` failures. + # ========================================================================== + + async def test_fetch_routing_map_recovers_after_transient_overlap_async(self): + """When the gateway returns an inconsistent paginated /pkranges snapshot + once and a consistent one on retry, the cache should populate cleanly + with the consistent data on the second attempt — the customer sees no + crash, no missing rows, just a brief stall.""" + # First call: stale parent + children missing parent reference → triggers _OverlapDetected. + bad_payload = [ + {'id': 'L', 'minInclusive': '', 'maxExclusive': '80'}, + {'id': '10', 'minInclusive': '80', 'maxExclusive': 'A0'}, # stale parent + {'id': '10/0', 'minInclusive': '80', 'maxExclusive': '90'}, # missing parents=['10'] + {'id': '10/1', 'minInclusive': '90', 'maxExclusive': 'A0'}, # missing parents=['10'] + {'id': 'R', 'minInclusive': 'A0', 'maxExclusive': 'FF'}, + ] + # Second call: same logical topology, but with the lineage metadata correctly + # propagated — gateway has now rotated to a consistent snapshot. + good_payload = [ + {'id': 'L', 'minInclusive': '', 'maxExclusive': '80'}, + {'id': '10/0', 'minInclusive': '80', 'maxExclusive': '90', 'parents': ['10']}, + {'id': '10/1', 'minInclusive': '90', 'maxExclusive': 'A0', 'parents': ['10']}, + {'id': 'R', 'minInclusive': 'A0', 'maxExclusive': 'FF'}, + ] + + responses = [bad_payload, good_payload] + call_count = {'n': 0} + + client = MagicMock() + + def fake_read_pk_ranges(collection_link, options, response_hook=None, **kwargs): + payload = responses[call_count['n']] if call_count['n'] < len(responses) else good_payload + call_count['n'] += 1 + headers = {http_constants.HttpHeaders.ETag: '"etag-{}"'.format(call_count['n'])} + if response_hook: + response_hook(headers, None) + capture_headers = kwargs.get('_internal_response_headers_capture') + if capture_headers is not None: + capture_headers.update(headers) + + async def async_gen(): + for r in payload: + yield r + + return async_gen() + + client._ReadPartitionKeyRanges = MagicMock(side_effect=fake_read_pk_ranges) + cache = PartitionKeyRangeCache(client) + + # Patch asyncio.sleep so the test does not actually wait the backoff. + async def _no_sleep(_seconds): + return None + + with patch( + 'azure.cosmos._routing.aio.routing_map_provider.asyncio.sleep', + new=_no_sleep, + ): + result = await cache.get_routing_map("dbs/db1/colls/coll1", feed_options={}) + + self.assertIsNotNone( + result, + "Cache should populate after the transient overlap clears on retry." + ) + self.assertEqual( + call_count['n'], 2, + "Expected exactly one retry: one failed fetch + one successful fetch." + ) + ids = [r['id'] for r in result._orderedPartitionKeyRanges] + # Post-fix expected ordering: L, 10/0, 10/1, R (the stale parent '10' + # is correctly filtered on the consistent retry payload). + self.assertEqual(ids, ['L', '10/0', '10/1', 'R']) + + async def test_fetch_routing_map_surfaces_503_after_persistent_overlap_async(self): + """If the gateway keeps returning inconsistent snapshots through every + retry attempt, the cache should NOT silently return empty results from + get_overlapping_ranges (which would be a correctness bug masquerading + as zero data). It must surface a typed transient HTTP error so the + upstream retry policy can decide what to do.""" + bad_payload = [ + {'id': 'L', 'minInclusive': '', 'maxExclusive': '80'}, + {'id': '10', 'minInclusive': '80', 'maxExclusive': 'A0'}, + {'id': '10/0', 'minInclusive': '80', 'maxExclusive': '90'}, + {'id': '10/1', 'minInclusive': '90', 'maxExclusive': 'A0'}, + {'id': 'R', 'minInclusive': 'A0', 'maxExclusive': 'FF'}, + ] + call_count = {'n': 0} + client = MagicMock() + + def fake_read_pk_ranges(collection_link, options, response_hook=None, **kwargs): + call_count['n'] += 1 + headers = {http_constants.HttpHeaders.ETag: '"etag-bad"'} + if response_hook: + response_hook(headers, None) + capture_headers = kwargs.get('_internal_response_headers_capture') + if capture_headers is not None: + capture_headers.update(headers) + + async def async_gen(): + for r in bad_payload: + yield r + + return async_gen() + + client._ReadPartitionKeyRanges = MagicMock(side_effect=fake_read_pk_ranges) + cache = PartitionKeyRangeCache(client) + + async def _no_sleep(_seconds): + return None + + with patch( + 'azure.cosmos._routing.aio.routing_map_provider.asyncio.sleep', + new=_no_sleep, + ): + with self.assertRaises(CosmosHttpResponseError) as ctx: + await cache.get_routing_map("dbs/db1/colls/coll1", feed_options={}) + + self.assertEqual( + ctx.exception.status_code, http_constants.StatusCodes.SERVICE_UNAVAILABLE, + "Persistent overlap must surface as HTTP 503 (transient), not as a bare ValueError " + "or as a silent empty-result return." + ) + # We should have exhausted the full retry budget (3 attempts by default). + self.assertEqual( + call_count['n'], _TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS, + "Should have made exactly _TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS fetch attempts before giving up." + ) + + async def test_fetch_routing_map_recovers_after_transient_gap_async(self): + """Mirror of the overlap-recovery test for the gap side: when the + gateway returns a snapshot with a hole in the key space once and a + consistent one on retry, the async cache should populate cleanly on + the second attempt rather than letting the empty result reach + ``SmartRoutingMapProvider`` (which would crash with + ``AssertionError``).""" + bad_payload = [ + {'id': 'L', 'minInclusive': '', 'maxExclusive': '80'}, + {'id': 'R', 'minInclusive': 'A0', 'maxExclusive': 'FF'}, + ] + good_payload = [ + {'id': 'L', 'minInclusive': '', 'maxExclusive': '80'}, + {'id': '10/0', 'minInclusive': '80', 'maxExclusive': '90'}, + {'id': '10/1', 'minInclusive': '90', 'maxExclusive': 'A0'}, + {'id': 'R', 'minInclusive': 'A0', 'maxExclusive': 'FF'}, + ] + + responses = [bad_payload, good_payload] + call_count = {'n': 0} + + client = MagicMock() + + def fake_read_pk_ranges(collection_link, options, response_hook=None, **kwargs): + payload = responses[call_count['n']] if call_count['n'] < len(responses) else good_payload + call_count['n'] += 1 + headers = {http_constants.HttpHeaders.ETag: '"etag-{}"'.format(call_count['n'])} + if response_hook: + response_hook(headers, None) + capture_headers = kwargs.get('_internal_response_headers_capture') + if capture_headers is not None: + capture_headers.update(headers) + + async def async_gen(): + for r in payload: + yield r + + return async_gen() + + client._ReadPartitionKeyRanges = MagicMock(side_effect=fake_read_pk_ranges) + cache = PartitionKeyRangeCache(client) + + async def _no_sleep(_seconds): + return None + + with patch( + 'azure.cosmos._routing.aio.routing_map_provider.asyncio.sleep', + new=_no_sleep, + ): + result = await cache.get_routing_map("dbs/db1/colls/coll1", feed_options={}) + + self.assertIsNotNone( + result, + "Async cache should populate after the transient gap clears on retry." + ) + self.assertEqual(call_count['n'], 2, "Expected exactly one retry.") + ids = [r['id'] for r in result._orderedPartitionKeyRanges] + self.assertEqual(ids, ['L', '10/0', '10/1', 'R']) + + async def test_fetch_routing_map_surfaces_503_after_persistent_gap_async(self): + """Mirror of the overlap-503 test for the gap side: a persistent gap + across the retry budget must surface as ``CosmosHttpResponseError(503)`` + rather than as an ``AssertionError("code bug: ...")`` from + ``SmartRoutingMapProvider``.""" + bad_payload = [ + {'id': 'L', 'minInclusive': '', 'maxExclusive': '80'}, + {'id': 'R', 'minInclusive': 'A0', 'maxExclusive': 'FF'}, + ] + call_count = {'n': 0} + client = MagicMock() + + def fake_read_pk_ranges(collection_link, options, response_hook=None, **kwargs): + call_count['n'] += 1 + headers = {http_constants.HttpHeaders.ETag: '"etag-bad"'} + if response_hook: + response_hook(headers, None) + capture_headers = kwargs.get('_internal_response_headers_capture') + if capture_headers is not None: + capture_headers.update(headers) + + async def async_gen(): + for r in bad_payload: + yield r + + return async_gen() + + client._ReadPartitionKeyRanges = MagicMock(side_effect=fake_read_pk_ranges) + cache = PartitionKeyRangeCache(client) + + async def _no_sleep(_seconds): + return None + + with patch( + 'azure.cosmos._routing.aio.routing_map_provider.asyncio.sleep', + new=_no_sleep, + ): + with self.assertRaises(CosmosHttpResponseError) as ctx: + await cache.get_routing_map("dbs/db1/colls/coll1", feed_options={}) + + self.assertEqual(ctx.exception.status_code, http_constants.StatusCodes.SERVICE_UNAVAILABLE) + self.assertEqual(call_count['n'], _TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS) + + async def test_incremental_overlap_converts_to_incremental_merge_failed_async(self): + """If the incremental-merge path produces overlapping ranges (e.g. the + delta contains a range whose key span overlaps an existing cached + range without either side declaring the other a parent), the + ``ValueError("Ranges overlap")`` raised by ``try_combine`` must NOT + escape to the caller. It must convert to ``_IncrementalMergeFailed`` + so the standard fallback path takes over (retry incremental once, + then full-load — which has its own ``_OverlapDetected`` handler). + This is what guarantees the customer never observes a bare + ``ValueError`` from any of the validator's call sites.""" + + # Existing cached map: '0' covers ['', '80'] and '1' covers ['80', 'FF']. + previous_map = CollectionRoutingMap.CompleteRoutingMap( + [ + ({'id': '0', 'minInclusive': '', 'maxExclusive': '80'}, True), + ({'id': '1', 'minInclusive': '80', 'maxExclusive': 'FF'}, True), + ], + 'coll1', '"etag-prev"' + ) + + # Delta: + # - '0' re-declared with the same span (resolves via the existing + # ``known_range_info_by_id`` lookup — no parents needed). + # - '2' with ``parents=['1']`` and a span that overlaps '0'. The + # parent-resolution loop succeeds because '1' is in the cache, + # so we reach ``try_combine``. Once '1' is removed as the gone + # parent, the merged map is { '0' ('', '80'), '2' ('40', 'FF') } + # — '0' overlaps '2' on ['40', '80'], so ``is_complete_set_of_range`` + # raises ``ValueError("Ranges overlap: ...")`` from inside + # ``try_combine``. + bad_delta = [ + {'id': '0', 'minInclusive': '', 'maxExclusive': '80'}, + {'id': '2', 'minInclusive': '40', 'maxExclusive': 'FF', 'parents': ['1']}, + ] + + # The wrapper around try_combine must absorb the ValueError and convert + # it to _IncrementalMergeFailed for the caller's retry loop. + with self.assertRaises(_IncrementalMergeFailed): + process_fetched_ranges( + bad_delta, previous_map, 'coll1', 'dbs/db1/colls/coll1', '"etag-new"' + ) + + async def test_fetch_routing_map_mixed_overlap_and_gap_signals_share_retry_budget_async(self): + """Async mirror of + ``test_fetch_routing_map_mixed_overlap_and_gap_signals_share_retry_budget``. + + The transient-snapshot retry budget is a single counter shared by + BOTH ``_OverlapDetected`` and ``_GapDetected`` signals -- it is not + per-signal-type. If the gateway alternates between overlap snapshots + and gap snapshots across attempts, the SDK must still surface a 503 + after the same ``_TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS`` budget. + + Async coverage is independent of the sync test because the async + ``_fetch_routing_map`` has its own ``except (_OverlapDetected, + _GapDetected)`` block and increments its own + ``inconsistency_attempt_count`` local under ``async with`` lock + scoping -- a future refactor that, for example, reset the counter + on signal-type change, or that lost the counter across an + ``await`` boundary, would only be caught by exercising the async + codepath end-to-end.""" + # Overlap payload: stale parent '10' coexists with its children that + # lack a ``parents`` reference. Triggers ``_OverlapDetected``. + overlap_payload = [ + {'id': 'L', 'minInclusive': '', 'maxExclusive': '80'}, + {'id': '10', 'minInclusive': '80', 'maxExclusive': 'A0'}, + {'id': '10/0', 'minInclusive': '80', 'maxExclusive': '90'}, + {'id': '10/1', 'minInclusive': '90', 'maxExclusive': 'A0'}, + {'id': 'R', 'minInclusive': 'A0', 'maxExclusive': 'FF'}, + ] + # Gap payload: ['80', 'A0') is missing entirely. Triggers ``_GapDetected``. + gap_payload = [ + {'id': 'L', 'minInclusive': '', 'maxExclusive': '80'}, + {'id': 'R', 'minInclusive': 'A0', 'maxExclusive': 'FF'}, + ] + + responses = [overlap_payload, gap_payload, overlap_payload] + call_count = {'n': 0} + + client = MagicMock() + + def fake_read_pk_ranges(collection_link, options, response_hook=None, **kwargs): + payload = responses[call_count['n']] if call_count['n'] < len(responses) else overlap_payload + call_count['n'] += 1 + headers = {http_constants.HttpHeaders.ETag: '"etag-mixed-{}"'.format(call_count['n'])} + if response_hook: + response_hook(headers, None) + capture_headers = kwargs.get('_internal_response_headers_capture') + if capture_headers is not None: + capture_headers.update(headers) + + async def async_gen(): + for r in payload: + yield r + + return async_gen() + + client._ReadPartitionKeyRanges = MagicMock(side_effect=fake_read_pk_ranges) + cache = PartitionKeyRangeCache(client) + + async def _no_sleep(_seconds): + return None + + with patch( + 'azure.cosmos._routing.aio.routing_map_provider.asyncio.sleep', + new=_no_sleep, + ): + with self.assertRaises(CosmosHttpResponseError) as ctx: + await cache.get_routing_map("dbs/db1/colls/coll1", feed_options={}) + + self.assertEqual( + ctx.exception.status_code, http_constants.StatusCodes.SERVICE_UNAVAILABLE, + "Alternating overlap/gap signals must still surface as HTTP 503 once " + "the shared budget is exhausted." + ) + self.assertEqual( + call_count['n'], _TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS, + "Overlap and gap signals must share one retry budget; alternating " + "between them must NOT extend the total number of attempts." + ) + + async def test_fetch_routing_map_preserves_existing_cache_entry_when_force_refresh_surfaces_503_async(self): + """Async mirror of + ``test_fetch_routing_map_preserves_existing_cache_entry_when_force_refresh_surfaces_503``. + + A 503 raised by ``_fetch_routing_map`` during a forced refresh must + NOT corrupt the existing cached routing map. The SDK commonly issues + ``force_refresh=True`` during 410/Gone recovery paths; if that refresh + itself fails transiently we want subsequent reads to keep returning + the previously-cached map (a slightly stale answer is far better than + a cache wiped out by a transient gateway hiccup). + + Async coverage is independent of the sync test because the async + ``get_routing_map`` writes to the cache from inside an ``async with`` + block. If a future refactor moved the write before the inner + ``try``/``except``, or if exception unwinding through the async + context manager somehow cleared the entry, only the async-level + end-to-end test would catch it.""" + # Pre-populate the shared cache with a known-good routing map. + cached_map = _make_complete_routing_map("dbs/db1/colls/coll1", '"etag-cached"') + cache = PartitionKeyRangeCache(MagicMock()) + cache._collection_routing_map_by_item["dbs/db1/colls/coll1"] = cached_map + + # Wire the client to return an inconsistent (overlap) snapshot every + # time -- forces the retry loop to exhaust its budget and raise 503. + bad_payload = [ + {'id': 'L', 'minInclusive': '', 'maxExclusive': '80'}, + {'id': '10', 'minInclusive': '80', 'maxExclusive': 'A0'}, + {'id': '10/0', 'minInclusive': '80', 'maxExclusive': '90'}, + {'id': '10/1', 'minInclusive': '90', 'maxExclusive': 'A0'}, + {'id': 'R', 'minInclusive': 'A0', 'maxExclusive': 'FF'}, + ] + + def fake_read_pk_ranges(collection_link, options, response_hook=None, **kwargs): + headers = {http_constants.HttpHeaders.ETag: '"etag-bad"'} + if response_hook: + response_hook(headers, None) + capture_headers = kwargs.get('_internal_response_headers_capture') + if capture_headers is not None: + capture_headers.update(headers) + + async def async_gen(): + for r in bad_payload: + yield r + + return async_gen() + + cache._document_client._ReadPartitionKeyRanges = MagicMock(side_effect=fake_read_pk_ranges) + + async def _no_sleep(_seconds): + return None + + with patch( + 'azure.cosmos._routing.aio.routing_map_provider.asyncio.sleep', + new=_no_sleep, + ): + with self.assertRaises(CosmosHttpResponseError) as ctx: + await cache.get_routing_map( + "dbs/db1/colls/coll1", + feed_options={}, + force_refresh=True, + previous_routing_map=cached_map, + ) + + self.assertEqual(ctx.exception.status_code, http_constants.StatusCodes.SERVICE_UNAVAILABLE) + + # Critical invariant: the previously-cached map must still be reachable + # via the same key. A 503 from a forced refresh must never evict good + # cache state -- otherwise every transient gateway blip would force the + # next reader to pay a cold-start cost. + self.assertIs( + cache._collection_routing_map_by_item.get("dbs/db1/colls/coll1"), cached_map, + "Cached routing map must be preserved after a 503 from forced refresh -- " + "transient inconsistencies must not evict good cache state." + ) + self.assertEqual( + cache._collection_routing_map_by_item["dbs/db1/colls/coll1"].change_feed_etag, + '"etag-cached"', + "Cached ETag must remain the pre-503 value (no partial overwrite)." + ) + if __name__ == "__main__": unittest.main()