From 34447a086b800634860f495864e16182ff08bb37 Mon Sep 17 00:00:00 2001 From: dibahlfi <106994927+dibahlfi@users.noreply.github.com> Date: Fri, 22 May 2026 17:11:34 -0500 Subject: [PATCH 1/3] fixing ValueError bug --- .../_routing/_routing_map_provider_common.py | 143 +++++++++++- .../_routing/aio/routing_map_provider.py | 26 +++ .../cosmos/_routing/collection_routing_map.py | 93 +++++++- .../cosmos/_routing/routing_map_provider.py | 27 +++ .../routing/test_collection_routing_map.py | 142 +++++++++++- .../tests/test_routing_map_provider_unit.py | 215 +++++++++++++++++- .../test_routing_map_provider_unit_async.py | 191 +++++++++++++++- 7 files changed, 822 insertions(+), 15 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/_routing_map_provider_common.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/_routing_map_provider_common.py index ce579fdb258a..806be8544695 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/_routing_map_provider_common.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/_routing_map_provider_common.py @@ -28,10 +28,16 @@ """ import logging +import random from typing import Any, Dict, List, Optional, Tuple from .. import _base, http_constants -from .collection_routing_map import CollectionRoutingMap, _build_routing_map_from_ranges +from ..exceptions import CosmosHttpResponseError +from .collection_routing_map import ( + CollectionRoutingMap, + _build_routing_map_from_ranges, + _OverlapDetected, # noqa: F401 # re-exported for sync/async provider modules and tests +) from . import routing_range from .routing_range import ( PKRange, @@ -44,6 +50,114 @@ PAGE_SIZE_CHANGE_FEED = "-1" # Return all available changes +# Number of times the full-load path will re-fetch ``/pkranges`` when the +# builder reports an overlap (``_OverlapDetected``). Overlap on the full-load +# path is treated as a transient gateway inconsistency, so a small fixed +# retry budget with backoff is preferred over surfacing immediately. After +# this many attempts the caller surfaces a transient HTTP 503 so the +# upstream retry policy can take over. +# +# Defined here (rather than in each provider module) so the sync and async +# providers cannot drift on the retry budget — both import the same constant. +_OVERLAP_RETRY_MAX_ATTEMPTS = 3 +# Initial backoff between overlap retries; doubles each attempt. Worst-case +# total sleep under the budget above is ~3.5s (0.5 + 1.0 + 2.0). +_OVERLAP_RETRY_INITIAL_BACKOFF_SECONDS = 0.5 + + +def _jittered_backoff(backoff_seconds: float) -> float: + """Return a uniformly-jittered backoff in the range ``[0, backoff_seconds]``. + + Implements the "full jitter" strategy: the actual sleep is drawn uniformly + from zero to the full deterministic backoff. This decorrelates concurrent + retriers (for example, multiple Cosmos clients running inside a single + PySpark process that all hit the same gateway node on the same bad + ``/pkranges`` snapshot at the same instant) so they do not retry in + lockstep and re-collide on the same gateway node. + + The worst-case sleep per attempt is unchanged (still bounded by the + deterministic backoff), so the documented retry-budget contract still + holds; the expected per-attempt sleep is half of it. + """ + return random.uniform(0, backoff_seconds) + + +def _handle_overlap_retry_decision( + *, + overlap_attempt_count: int, + collection_link: str, + logger: logging.Logger, # pylint: disable=redefined-outer-name +) -> float: + """Decide what to do after the full-load builder reported an overlap. + + Centralises the sync/async-identical retry policy. Returns the number of + seconds the caller should sleep before the next attempt. Raises + :class:`CosmosHttpResponseError` (HTTP 503) when the attempt budget has + been exhausted; the caller's existing retry policy then handles it as + a transient error. + + The returned sleep duration is jittered (see :func:`_jittered_backoff`) + so concurrent retriers do not retry in lockstep. The deterministic + backoff schedule (0.5s -> 1.0s -> 2.0s, doubling) defines the *upper + bound* of each attempt's sleep; the actual sleep is drawn uniformly + from ``[0, that upper bound]``. + + The caller is responsible for the actual sleep (sync ``time.sleep`` or + ``await asyncio.sleep``). Keeping the sleep at the call site is what + lets this helper stay free of concurrency-runtime assumptions — the + only line that has to differ between the sync and async providers. + + :param int overlap_attempt_count: Number of overlap attempts made so far, + including the one that just failed. Pass ``1`` after the first failure, + ``2`` after the second, etc. + :param str collection_link: Used in log messages and the 503 error body + so the caller knows which collection ran out of budget. + :param logging.Logger logger: Caller's module-level logger, so messages + appear under the right ``azure.cosmos._routing.*`` namespace. + :return: Jittered backoff seconds to sleep before retrying. Guaranteed + to be in ``[0, deterministic_backoff_for_attempt]``. + :rtype: float + :raises CosmosHttpResponseError: When ``overlap_attempt_count`` has reached + ``_OVERLAP_RETRY_MAX_ATTEMPTS``. Status code is 503 so the upstream + retry policy classifies it as transient. + """ + if overlap_attempt_count >= _OVERLAP_RETRY_MAX_ATTEMPTS: + logger.error( + "Full-load routing-map fetch for collection '%s' detected " + "overlapping partition key ranges on every one of %d attempt(s). " + "Surfacing as transient HTTP 503 so the caller's retry policy " + "can take over.", + collection_link, + overlap_attempt_count, + ) + raise CosmosHttpResponseError( + status_code=http_constants.StatusCodes.SERVICE_UNAVAILABLE, + message=( + "Failed to build routing map for collection '{}': " + "overlapping partition key ranges persisted across {} " + "full-load attempt(s). Surfaced as a retryable transient " + "error so the upstream retry policy can take over, rather " + "than allowing the underlying ValueError to escape as a " + "fatal crash." + ).format(collection_link, overlap_attempt_count), + ) + + deterministic_backoff = ( + _OVERLAP_RETRY_INITIAL_BACKOFF_SECONDS * (2 ** (overlap_attempt_count - 1)) + ) + jittered_backoff = _jittered_backoff(deterministic_backoff) + logger.warning( + "Full-load routing-map fetch for collection '%s' detected overlapping " + "partition key ranges (attempt %d/%d). Sleeping %.2fs (jittered from " + "upper bound %.2fs) and retrying.", + collection_link, + overlap_attempt_count, + _OVERLAP_RETRY_MAX_ATTEMPTS, + jittered_backoff, + deterministic_backoff, + ) + return jittered_backoff + def is_cache_unchanged_since_previous( collection_routing_map_by_item: Dict[str, CollectionRoutingMap], @@ -257,7 +371,32 @@ def process_fetched_ranges( unresolved = next_unresolved - result = previous_routing_map.try_combine(range_tuples, effective_etag) + try: + result = previous_routing_map.try_combine(range_tuples, effective_etag) + except ValueError as overlap_error: + # ``try_combine`` validates the merged map via + # ``CollectionRoutingMap.is_complete_set_of_range`` and raises + # ``ValueError("Ranges overlap: ...")`` if the merge produces a + # self-contradictory tiling. This can happen during the incremental + # path when the delta contains a range whose key span overlaps an + # existing cached range without either side declaring the other a + # parent. + # + # We must NOT let this ``ValueError`` escape: the cache layer above + # treats a ``None`` routing map as "no ranges" and would convert + # the bare exception into a silent empty-result return at + # ``get_overlapping_ranges``. Convert to ``_IncrementalMergeFailed`` + # so the caller's existing retry loop retries the incremental fetch + # once and then falls back to the full-load path, which has its own + # ``_OverlapDetected`` handler with retry+backoff and surfaces a + # transient HTTP 503 if the inconsistency persists. + logger.warning( + "Incremental merge for collection '%s' produced overlapping ranges: %s. " + "Converting to _IncrementalMergeFailed so the caller retries / " + "falls back to a full refresh.", + collection_link, str(overlap_error), + ) + raise _IncrementalMergeFailed() from overlap_error if not result: logger.warning( "Incremental merge resulted in incomplete routing map for " diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/aio/routing_map_provider.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/aio/routing_map_provider.py index 4cfb429ab7e3..33129e95ae21 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/aio/routing_map_provider.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/aio/routing_map_provider.py @@ -38,6 +38,9 @@ determine_refresh_action, get_smart_overlapping_ranges, _IncrementalMergeFailed, + _OverlapDetected, + _OVERLAP_RETRY_MAX_ATTEMPTS, # noqa: F401 # re-exported for tests + _handle_overlap_retry_decision, ) @@ -103,6 +106,10 @@ # Number of extra incremental attempts after an incomplete incremental merge # before falling back to a full routing-map refresh. _INCOMPLETE_ROUTING_MAP_MAX_RETRIES = 1 + +# Overlap-retry budget and backoff live in ``_routing_map_provider_common`` so +# the sync and async providers cannot drift on them. ``_OVERLAP_RETRY_MAX_ATTEMPTS`` +# is re-exported through this module for test imports. class PartitionKeyRangeCache(object): """ PartitionKeyRangeCache provides list of effective partition key ranges for a @@ -343,6 +350,7 @@ async def _fetch_routing_map( """ current_previous_map = previous_routing_map incomplete_attempt_count = 0 + overlap_attempt_count = 0 while True: request_kwargs = dict(kwargs) @@ -398,6 +406,24 @@ async def _fetch_routing_map( continue raise + except _OverlapDetected: + # The full-load builder reported overlapping ranges. Apply + # the retry-on-overlap policy: ``_handle_overlap_retry_decision`` + # either returns a backoff to sleep or raises ``CosmosHttpResponseError`` + # (503) when the attempt budget is exhausted. Reset + # ``current_previous_map`` to ``None`` so the next iteration + # runs the full-load path regardless of which path tripped + # the overlap — we do not want to keep retrying an incremental + # fetch against the same inconsistent base. + overlap_attempt_count += 1 + backoff = _handle_overlap_retry_decision( + overlap_attempt_count=overlap_attempt_count, + collection_link=collection_link, + logger=logger, + ) + await asyncio.sleep(backoff) + current_previous_map = None + continue async def get_range_by_partition_key_range_id( self, diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/collection_routing_map.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/collection_routing_map.py index ba719f955a72..555737822685 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/collection_routing_map.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/collection_routing_map.py @@ -29,6 +29,30 @@ from azure.cosmos._routing import routing_range from azure.cosmos._routing.routing_range import PartitionKeyRange, PKRange + +class _OverlapDetected(Exception): + """Sentinel raised by :func:`_build_routing_map_from_ranges` when the + ``/pkranges`` response contains overlapping ranges that the SDK cannot + reconcile from a single snapshot. + + Distinct from a plain ``ValueError`` so the caller can identify the + overlap case explicitly and apply the retry-on-overlap policy instead + of treating it as a fatal programmer error. Overlap on this path is + expected to be transient (a paginated ``/pkranges`` response that is + not snapshot-isolated across gateway nodes), so a short retry budget + with backoff is sufficient. + + The caller (``_fetch_routing_map`` on both sync and async cache + providers) catches this sentinel, sleeps briefly, and retries the + fetch. After a small number of failed retries it surfaces a typed + transient HTTP error so the upstream retry policy can take over. The + sentinel is intentionally not allowed to escape to the query layer + as a bare ``ValueError`` — that path would otherwise convert into a + silent empty-result return at ``get_overlapping_ranges`` (which + treats a ``None`` routing map as "no ranges"), masking the failure + as a correctness bug. + """ + # pylint: disable=line-too-long class CollectionRoutingMap(object): """Stores partition key ranges in an efficient way with some additional @@ -196,7 +220,23 @@ def is_complete_set_of_range(ordered_partition_key_range_list): if not isComplete: if previousRange[PartitionKeyRange.MaxExclusive] > currentRange[PartitionKeyRange.MinInclusive]: - raise ValueError("Ranges overlap") + # Include the offending pair in the message so whoever + # investigates the next occurrence has actionable + # diagnostics without having to reproduce the failure + # under a debugger. Keep the literal substring + # "Ranges overlap" for backwards compatibility with + # any caller that pattern-matches on it. + raise ValueError( + "Ranges overlap: previous range id={!r} ({!r} -> {!r}) " + "overlaps current range id={!r} ({!r} -> {!r})".format( + previousRange.get(PartitionKeyRange.Id), + previousRange[PartitionKeyRange.MinInclusive], + previousRange[PartitionKeyRange.MaxExclusive], + currentRange.get(PartitionKeyRange.Id), + currentRange[PartitionKeyRange.MinInclusive], + currentRange[PartitionKeyRange.MaxExclusive], + ) + ) break return isComplete @@ -270,7 +310,10 @@ def _build_routing_map_from_ranges( Filters out parent (gone) ranges and validates that the remaining ranges form a complete, gap-free partition key space. Returns None if the ranges - are incomplete. + are incomplete (gap), and raises ``_OverlapDetected`` if the ranges + overlap — the caller is expected to retry the fetch on the overlap case, + since overlap on the full-load path is treated as a transient gateway + inconsistency rather than a permanent input error. This is shared between the sync and async PartitionKeyRangeCache to avoid code duplication — the logic is purely synchronous. @@ -280,9 +323,27 @@ def _build_routing_map_from_ranges( :param str new_etag: The ETag from the change feed response. :param str collection_link: The collection link, used for log messages. :param logging.Logger _logger: Logger instance for error reporting. - :return: A complete CollectionRoutingMap, or None if the ranges are incomplete. + :return: A complete CollectionRoutingMap, or None if the ranges are incomplete (gap). :rtype: Optional[CollectionRoutingMap] + :raises _OverlapDetected: If the ranges contain an overlap that could not + be resolved from this single snapshot. The caller should retry the + fetch; see :class:`_OverlapDetected` for the rationale. """ + # Dedup the input by id BEFORE parent filtering and validation. At high + # partition counts the /pkranges response is paginated, and pagination + # is not snapshot-isolated across gateway nodes — consecutive pages can + # legitimately return the same range id when the page boundary falls + # between two nodes with one-tick-out-of-sync caches. Without this dedup + # the duplicate would survive into the sortedRanges list inside + # CompleteRoutingMap and trip the overlap check on two identical entries. + # Last-write-wins is safe: duplicates describe the same logical range + # and any later occurrence carries the same id, min/max, and (when + # present) the more-complete metadata such as ``parents``. + deduped_by_id: dict = {} + for r in ranges: + deduped_by_id[r[PartitionKeyRange.Id]] = r + ranges = list(deduped_by_id.values()) + gone_range_ids = set() for r in ranges: if PartitionKeyRange.Parents in r and r[PartitionKeyRange.Parents]: @@ -294,11 +355,27 @@ def _build_routing_map_from_ranges( ] range_tuples = [(r, True) for r in filtered_ranges] - routing_map = CollectionRoutingMap.CompleteRoutingMap( - range_tuples, - collection_id, - new_etag - ) + try: + routing_map = CollectionRoutingMap.CompleteRoutingMap( + range_tuples, + collection_id, + new_etag + ) + except ValueError as overlap_error: + # ``is_complete_set_of_range`` raises ``ValueError("Ranges overlap: ...")`` + # when the post-filter range list still contains an overlap. Convert + # it to ``_OverlapDetected`` so the caller can apply the retry-on- + # overlap policy. The bare ``ValueError`` must NOT escape to the + # cache layer: that path converts into a silent empty-result return + # at ``get_overlapping_ranges`` (which treats ``None`` from the + # cache as "no ranges"), masking the failure as a correctness bug. + _logger.warning( + "Full load of routing map for collection '%s' detected overlapping " + "partition key ranges: %s. Signalling caller to retry the " + "/pkranges fetch.", + collection_link, str(overlap_error), + ) + raise _OverlapDetected() from overlap_error if not routing_map: _logger.error( diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/routing_map_provider.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/routing_map_provider.py index 92abe54cba94..8bf2c2e5951f 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/routing_map_provider.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/routing_map_provider.py @@ -23,6 +23,7 @@ Cosmos database service. """ import threading +import time import logging from typing import Dict, Any, Optional, List, TYPE_CHECKING from azure.core.utils import CaseInsensitiveDict @@ -37,6 +38,9 @@ determine_refresh_action, get_smart_overlapping_ranges, _IncrementalMergeFailed, + _OverlapDetected, + _OVERLAP_RETRY_MAX_ATTEMPTS, # noqa: F401 # re-exported for tests + _handle_overlap_retry_decision, ) if TYPE_CHECKING: @@ -93,6 +97,10 @@ # Number of extra incremental attempts after an incomplete incremental merge # before falling back to a full routing-map refresh. _INCOMPLETE_ROUTING_MAP_MAX_RETRIES = 1 + +# Overlap-retry budget and backoff live in ``_routing_map_provider_common`` so +# the sync and async providers cannot drift on them. ``_OVERLAP_RETRY_MAX_ATTEMPTS`` +# is re-exported through this module for test imports. class PartitionKeyRangeCache(object): """ PartitionKeyRangeCache provides list of effective partition key ranges for a @@ -314,6 +322,7 @@ def _fetch_routing_map( """ current_previous_map = previous_routing_map incomplete_attempt_count = 0 + overlap_attempt_count = 0 while True: request_kwargs = dict(kwargs) @@ -368,6 +377,24 @@ def _fetch_routing_map( continue raise + except _OverlapDetected: + # The full-load builder reported overlapping ranges. Apply + # the retry-on-overlap policy: ``_handle_overlap_retry_decision`` + # either returns a backoff to sleep or raises ``CosmosHttpResponseError`` + # (503) when the attempt budget is exhausted. Reset + # ``current_previous_map`` to ``None`` so the next iteration + # runs the full-load path regardless of which path tripped + # the overlap — we do not want to keep retrying an incremental + # fetch against the same inconsistent base. + overlap_attempt_count += 1 + backoff = _handle_overlap_retry_decision( + overlap_attempt_count=overlap_attempt_count, + collection_link=collection_link, + logger=logger, + ) + time.sleep(backoff) + current_previous_map = None + continue def get_overlapping_ranges(self, collection_link, partition_key_ranges, feed_options, **kwargs): """Given a partition key range and a collection, return the list of diff --git a/sdk/cosmos/azure-cosmos/tests/routing/test_collection_routing_map.py b/sdk/cosmos/azure-cosmos/tests/routing/test_collection_routing_map.py index d4f261ee29ae..bfb6442d91ff 100644 --- a/sdk/cosmos/azure-cosmos/tests/routing/test_collection_routing_map.py +++ b/sdk/cosmos/azure-cosmos/tests/routing/test_collection_routing_map.py @@ -7,7 +7,11 @@ import pytest import azure.cosmos._routing.routing_range as routing_range -from azure.cosmos._routing.collection_routing_map import CollectionRoutingMap, _build_routing_map_from_ranges +from azure.cosmos._routing.collection_routing_map import ( + CollectionRoutingMap, + _build_routing_map_from_ranges, + _OverlapDetected, +) @pytest.mark.cosmosEmulator @@ -449,6 +453,142 @@ def test_build_routing_map_stores_etag(self): result_none_etag = _build_routing_map_from_ranges(ranges, 'coll1', None, 'link', _logger) self.assertIsNone(result_none_etag.change_feed_etag) + # ========================================================================== + # Regression tests for ValueError("Ranges overlap") triggered by transient + # /pkranges snapshot inconsistencies at high partition counts. + # + # These tests reproduce three concrete failure modes observed against a + # customer container with ~25.6k physical partitions during a long-running + # async Spark scan. + # + # The desired behaviour at the builder layer is one of two outcomes per + # mode, never a bare ``ValueError`` escaping to the caller (which would + # otherwise reach ``get_overlapping_ranges`` where a ``None`` routing map + # silently returns an empty list — a correctness bug, not a crash): + # + # * Mode 1 (duplicate range across pages) — the builder must dedup by + # id BEFORE validation and succeed, since duplicates describe the + # same logical range. + # + # * Modes 2 and 3 (stale parent / surviving grandparent with broken + # intermediate references) -- the builder cannot reconstruct what is + # not in the data, so it must convert the unrecoverable-from-this- + # snapshot ``ValueError`` into a typed ``_OverlapDetected`` sentinel. + # The caller (`_fetch_routing_map`) catches the sentinel, retries + # after a brief backoff, and ultimately surfaces a typed transient + # HTTP 503 if the inconsistency persists across the retry budget. + # ========================================================================== + + def test_full_load_dedups_duplicate_range_id_across_pages_mode_1(self): + """Mode 1 (duplicate range across pages): when /pkranges pagination + returns the same range id on two consecutive pages because two gateway + nodes serve from slightly different cached snapshots, the SDK should + dedup by id (last-write-wins) before validating and produce a valid + routing map — not raise.""" + _logger = logging.getLogger("test") + # Range id '1' appears twice because page boundary fell between two + # gateway nodes with one-tick-out-of-sync caches. + ranges = [ + {'id': '0', 'minInclusive': '', 'maxExclusive': '40'}, + {'id': '1', 'minInclusive': '40', 'maxExclusive': '80'}, + {'id': '1', 'minInclusive': '40', 'maxExclusive': '80'}, # duplicate from next page + {'id': '2', 'minInclusive': '80', 'maxExclusive': 'C0'}, + {'id': '3', 'minInclusive': 'C0', 'maxExclusive': 'FF'}, + ] + result = _build_routing_map_from_ranges(ranges, 'coll1', '"etag-dup"', 'dbs/db/colls/coll1', _logger) + + self.assertIsNotNone( + result, + "Duplicate range id across pages should be deduped, not crash the builder." + ) + ids = [r['id'] for r in result._orderedPartitionKeyRanges] + self.assertEqual(ids, ['0', '1', '2', '3']) + + def test_full_load_raises_overlap_sentinel_for_stale_parent_with_missing_child_refs_mode_2(self): + """Mode 2 (stale parent, children missing parent reference): when a + gateway node returns a freshly-split parent alongside its children but + the children's 'parents' fields fail to reference the parent (because + the lineage metadata hadn't fully propagated when that node served the + page), _build_routing_map_from_ranges should convert the underlying + ValueError into _OverlapDetected so the caller can retry — and the + sentinel must NOT be a plain ValueError, since a plain ValueError + would escape to the cache layer and silently return empty results + from get_overlapping_ranges.""" + _logger = logging.getLogger("test") + # Parent '10' was split into '10/0' and '10/1', but the children on + # this page lost their 'parents': ['10'] reference. The one-direction + # parent filter cannot remove '10' because no surviving range names it. + ranges = [ + {'id': 'L', 'minInclusive': '', 'maxExclusive': '80'}, # unaffected left neighbor + {'id': '10', 'minInclusive': '80', 'maxExclusive': 'A0'}, # stale parent + {'id': '10/0', 'minInclusive': '80', 'maxExclusive': '90'}, # SHOULD have parents=['10'] + {'id': '10/1', 'minInclusive': '90', 'maxExclusive': 'A0'}, # SHOULD have parents=['10'] + {'id': 'R', 'minInclusive': 'A0', 'maxExclusive': 'FF'}, # unaffected right neighbor + ] + with self.assertRaises(_OverlapDetected): + _build_routing_map_from_ranges(ranges, 'coll1', '"etag-stale-parent"', 'dbs/db/colls/coll1', _logger) + + def test_full_load_raises_overlap_sentinel_for_grandparent_surviving_cascade_split_mode_3(self): + """Mode 3 (grandparent surviving a cascade split): when two generations + of splits have completed and the intermediate parent drops its + reference to the grandparent, the grandparent survives every available + defense and overlaps its grandchildren. _build_routing_map_from_ranges + should convert this into _OverlapDetected so the caller can retry.""" + _logger = logging.getLogger("test") + # '10' split into '10/0' and '10/1'; then '10/0' split into '10/0/0' + # and '10/0/1'. The grandchildren reference '10/0' correctly, but + # '10/0' and '10/1' both lost their 'parents': ['10'] reference, so + # the parent filter only collects {'10/0'} and leaves '10' in place. + ranges = [ + {'id': 'L', 'minInclusive': '', 'maxExclusive': '80'}, + {'id': '10', 'minInclusive': '80', 'maxExclusive': 'A0'}, # grandparent + {'id': '10/0', 'minInclusive': '80', 'maxExclusive': '90'}, # SHOULD have parents=['10'] + {'id': '10/0/0', 'minInclusive': '80', 'maxExclusive': '88', 'parents': ['10/0']}, + {'id': '10/0/1', 'minInclusive': '88', 'maxExclusive': '90', 'parents': ['10/0']}, + {'id': '10/1', 'minInclusive': '90', 'maxExclusive': 'A0'}, # SHOULD have parents=['10'] + {'id': 'R', 'minInclusive': 'A0', 'maxExclusive': 'FF'}, + ] + with self.assertRaises(_OverlapDetected): + _build_routing_map_from_ranges(ranges, 'coll1', '"etag-cascade"', 'dbs/db/colls/coll1', _logger) + + def test_overlap_sentinel_is_not_a_value_error(self): + """``_OverlapDetected`` must not be a subclass of ``ValueError`` (or any + other exception type that callers in the cache layer have historically + caught and swallowed). If it were, the very catch sites that today let + the bug crash the scan would silently absorb the sentinel and convert + it into the same empty-result correctness bug we were trying to avoid. + + The sentinel must be plainly an ``Exception``, distinguishable by + type, so the dedicated retry loop in ``_fetch_routing_map`` is the + only handler.""" + self.assertFalse( + issubclass(_OverlapDetected, ValueError), + "_OverlapDetected must not inherit from ValueError -- that would " + "allow legacy ValueError catches to absorb the retry signal." + ) + # Should still be a concrete Exception subclass (sanity check). + self.assertTrue(issubclass(_OverlapDetected, Exception)) + + def test_overlap_error_message_identifies_offending_ranges(self): + """When is_complete_set_of_range is called directly with genuinely + overlapping input (i.e. an unrecoverable programmer error rather than + a transient gateway snapshot), the ValueError message should identify + which two ranges overlapped so that whoever investigates the next + occurrence has actionable diagnostics out of the box.""" + ranges = [ + {'id': 'A', 'minInclusive': '', 'maxExclusive': '80'}, + {'id': 'B', 'minInclusive': '40', 'maxExclusive': 'FF'}, # overlaps with A + ] + with self.assertRaises(ValueError) as ctx: + CollectionRoutingMap.is_complete_set_of_range(ranges) + + msg = str(ctx.exception) + self.assertIn('overlap', msg.lower()) + self.assertIn('A', msg, "Error message should name the previous (offending) range id.") + self.assertIn('B', msg, "Error message should name the current (offending) range id.") + self.assertIn('80', msg, "Error message should include the previous range's maxExclusive.") + self.assertIn('40', msg, "Error message should include the current range's minInclusive.") + if __name__ == '__main__': unittest.main() diff --git a/sdk/cosmos/azure-cosmos/tests/test_routing_map_provider_unit.py b/sdk/cosmos/azure-cosmos/tests/test_routing_map_provider_unit.py index e6b203ae12e8..7d87879c74bc 100644 --- a/sdk/cosmos/azure-cosmos/tests/test_routing_map_provider_unit.py +++ b/sdk/cosmos/azure-cosmos/tests/test_routing_map_provider_unit.py @@ -7,16 +7,25 @@ - Empty change feed response (304 Not Modified / zero ranges from incremental update) """ +import logging import threading import time import unittest -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch import pytest -from azure.cosmos._routing.routing_map_provider import PartitionKeyRangeCache +from azure.cosmos._routing._routing_map_provider_common import ( + _OVERLAP_RETRY_INITIAL_BACKOFF_SECONDS, + _handle_overlap_retry_decision, +) +from azure.cosmos._routing.routing_map_provider import ( + PartitionKeyRangeCache, + _OVERLAP_RETRY_MAX_ATTEMPTS, +) from azure.cosmos._routing.collection_routing_map import CollectionRoutingMap from azure.cosmos import http_constants +from azure.cosmos.exceptions import CosmosHttpResponseError from azure.cosmos._gone_retry_policy_base import _PartitionKeyRangeGoneRetryPolicyBase @@ -618,6 +627,208 @@ def read_pk_ranges_cascading(collection_link, options, response_hook=None, **kwa self.assertEqual(ids, ['4', '5', '3', '1']) self.assertEqual(result.change_feed_etag, '"etag-old"') + # ========================================================================== + # End-to-end retry-loop tests for transient /pkranges snapshot inconsistency + # on the SYNC provider. Mirrors the async equivalents to guarantee both + # providers stay in lockstep on this contract: the cache-load pipeline + # never lets a bare ValueError("Ranges overlap") escape -- it either + # recovers on retry, or surfaces a typed CosmosHttpResponseError(503) the + # upstream retry policy already knows how to handle. + # ========================================================================== + + # ========================================================================== + # Unit tests for the overlap-retry policy helper. These pin the contract + # that the returned backoff is always within the deterministic upper bound + # (so the worst-case-wall-time guarantee in the public docs holds) and that + # jitter is actually applied (so concurrent retriers in different Cosmos + # clients -- e.g. several PySpark workers in the same process -- do not + # retry in lockstep on the same gateway node). + # ========================================================================== + + def test_overlap_retry_backoff_is_within_deterministic_upper_bound(self): + """For each non-terminal attempt, the returned backoff must lie inside + ``[0, deterministic_bound]`` where the deterministic bound is the + documented exponential schedule (0.5s, 1.0s, 2.0s). This is what + guarantees we never exceed the advertised worst-case wall time.""" + # Build a fresh logger so we don't compete for handlers with the real one. + test_logger = logging.getLogger(__name__ + ".jitter_bounds_test") + + # _OVERLAP_RETRY_MAX_ATTEMPTS is 3, so non-terminal attempts are 1 and 2. + # (Attempt 3 raises 503; that branch is exercised by the e2e test below.) + for attempt_index, expected_upper_bound in [(1, 0.5), (2, 1.0)]: + for _ in range(50): # 50 draws per attempt -- catches an + # accidentally-constant return value as well. + backoff = _handle_overlap_retry_decision( + overlap_attempt_count=attempt_index, + collection_link="dbs/db1/colls/coll1", + logger=test_logger, + ) + self.assertGreaterEqual( + backoff, 0.0, + "Jittered backoff must be non-negative (random.uniform " + "with low=0 invariant)." + ) + self.assertLessEqual( + backoff, expected_upper_bound, + "Jittered backoff for attempt {} must not exceed the " + "deterministic upper bound {}s; got {}s. This would " + "violate the worst-case wall-time contract documented " + "on _handle_overlap_retry_decision.".format( + attempt_index, expected_upper_bound, backoff, + ) + ) + + def test_overlap_retry_backoff_actually_varies_between_calls(self): + """Two consecutive calls for the same attempt index must not return + the same value with overwhelming probability -- otherwise jitter has + regressed to a fixed backoff and concurrent retriers in different + Cosmos clients will land back on the same gateway node in lockstep. + + We draw N samples and assert at least two distinct values. The + probability of all-identical draws from ``random.uniform(0, 0.5)`` is + effectively zero in 50 draws, so this is not a flake risk; but if a + future refactor accidentally returns the deterministic backoff, this + test fires loudly.""" + test_logger = logging.getLogger(__name__ + ".jitter_variance_test") + samples = [ + _handle_overlap_retry_decision( + overlap_attempt_count=1, + collection_link="dbs/db1/colls/coll1", + logger=test_logger, + ) + for _ in range(50) + ] + self.assertGreater( + len(set(samples)), 1, + "Overlap-retry backoff produced identical values across 50 draws " + "-- jitter has likely regressed. Each draw should be an " + "independent random.uniform(0, deterministic_bound) sample." + ) + + def test_overlap_retry_raises_503_at_attempt_budget_exhaustion(self): + """At the documented attempt budget (_OVERLAP_RETRY_MAX_ATTEMPTS), + the helper must raise CosmosHttpResponseError(503) -- not return a + backoff. This is the only branch that surfaces an exception to the + caller, and it is what lets the upstream Cosmos retry policy take + over instead of the SDK silently giving up. + + Also exercised end-to-end by + ``test_fetch_routing_map_surfaces_503_after_persistent_overlap``, + but this is the focused unit-level guard.""" + test_logger = logging.getLogger(__name__ + ".jitter_budget_test") + with self.assertRaises(CosmosHttpResponseError) as ctx: + _handle_overlap_retry_decision( + overlap_attempt_count=_OVERLAP_RETRY_MAX_ATTEMPTS, + collection_link="dbs/db1/colls/coll1", + logger=test_logger, + ) + self.assertEqual( + ctx.exception.status_code, + http_constants.StatusCodes.SERVICE_UNAVAILABLE, + ) + + # ========================================================================== + # End-to-end retry-loop tests below ↓ + # ========================================================================== + + def test_fetch_routing_map_recovers_after_transient_overlap(self): + """When the gateway returns an inconsistent paginated /pkranges snapshot + once and a consistent one on retry, the sync cache should populate + cleanly on the second attempt — the customer sees no crash, no missing + rows, just a brief stall.""" + # First call: Mode 2 payload (stale parent + children missing parent ref). + bad_payload = [ + {'id': 'L', 'minInclusive': '', 'maxExclusive': '80'}, + {'id': '10', 'minInclusive': '80', 'maxExclusive': 'A0'}, # stale parent + {'id': '10/0', 'minInclusive': '80', 'maxExclusive': '90'}, # missing parents=['10'] + {'id': '10/1', 'minInclusive': '90', 'maxExclusive': 'A0'}, # missing parents=['10'] + {'id': 'R', 'minInclusive': 'A0', 'maxExclusive': 'FF'}, + ] + # Second call: consistent snapshot, lineage metadata correctly propagated. + good_payload = [ + {'id': 'L', 'minInclusive': '', 'maxExclusive': '80'}, + {'id': '10/0', 'minInclusive': '80', 'maxExclusive': '90', 'parents': ['10']}, + {'id': '10/1', 'minInclusive': '90', 'maxExclusive': 'A0', 'parents': ['10']}, + {'id': 'R', 'minInclusive': 'A0', 'maxExclusive': 'FF'}, + ] + + responses = [bad_payload, good_payload] + call_count = {'n': 0} + + client = MagicMock() + + def fake_read_pk_ranges(collection_link, options, response_hook=None, **kwargs): + payload = responses[call_count['n']] if call_count['n'] < len(responses) else good_payload + call_count['n'] += 1 + headers = {http_constants.HttpHeaders.ETag: '"etag-{}"'.format(call_count['n'])} + if response_hook: + response_hook(headers, None) + capture_headers = kwargs.get('_internal_response_headers_capture') + if capture_headers is not None: + capture_headers.update(headers) + return iter(payload) + + client._ReadPartitionKeyRanges = MagicMock(side_effect=fake_read_pk_ranges) + cache = PartitionKeyRangeCache(client) + + # Patch time.sleep so the test does not actually wait the backoff. + with patch('azure.cosmos._routing.routing_map_provider.time.sleep', return_value=None): + result = cache.get_routing_map("dbs/db1/colls/coll1", feed_options={}) + + self.assertIsNotNone( + result, + "Sync cache should populate after the transient overlap clears on retry." + ) + self.assertEqual( + call_count['n'], 2, + "Expected exactly one retry: one failed fetch + one successful fetch." + ) + ids = [r['id'] for r in result._orderedPartitionKeyRanges] + self.assertEqual(ids, ['L', '10/0', '10/1', 'R']) + + def test_fetch_routing_map_surfaces_503_after_persistent_overlap(self): + """If the gateway keeps returning inconsistent snapshots across every + retry attempt on the sync provider, the cache must NOT silently return + empty results from get_overlapping_ranges (correctness bug). It must + surface a typed transient HTTP error so the upstream retry policy can + decide what to do.""" + bad_payload = [ + {'id': 'L', 'minInclusive': '', 'maxExclusive': '80'}, + {'id': '10', 'minInclusive': '80', 'maxExclusive': 'A0'}, + {'id': '10/0', 'minInclusive': '80', 'maxExclusive': '90'}, + {'id': '10/1', 'minInclusive': '90', 'maxExclusive': 'A0'}, + {'id': 'R', 'minInclusive': 'A0', 'maxExclusive': 'FF'}, + ] + call_count = {'n': 0} + client = MagicMock() + + def fake_read_pk_ranges(collection_link, options, response_hook=None, **kwargs): + call_count['n'] += 1 + headers = {http_constants.HttpHeaders.ETag: '"etag-bad"'} + if response_hook: + response_hook(headers, None) + capture_headers = kwargs.get('_internal_response_headers_capture') + if capture_headers is not None: + capture_headers.update(headers) + return iter(bad_payload) + + client._ReadPartitionKeyRanges = MagicMock(side_effect=fake_read_pk_ranges) + cache = PartitionKeyRangeCache(client) + + with patch('azure.cosmos._routing.routing_map_provider.time.sleep', return_value=None): + with self.assertRaises(CosmosHttpResponseError) as ctx: + cache.get_routing_map("dbs/db1/colls/coll1", feed_options={}) + + self.assertEqual( + ctx.exception.status_code, http_constants.StatusCodes.SERVICE_UNAVAILABLE, + "Persistent overlap must surface as HTTP 503 (transient), not as a bare ValueError " + "or as a silent empty-result return." + ) + self.assertEqual( + call_count['n'], _OVERLAP_RETRY_MAX_ATTEMPTS, + "Should have made exactly _OVERLAP_RETRY_MAX_ATTEMPTS fetch attempts before giving up." + ) + if __name__ == "__main__": unittest.main() diff --git a/sdk/cosmos/azure-cosmos/tests/test_routing_map_provider_unit_async.py b/sdk/cosmos/azure-cosmos/tests/test_routing_map_provider_unit_async.py index aa4a06eb9ebc..7a9465a42baf 100644 --- a/sdk/cosmos/azure-cosmos/tests/test_routing_map_provider_unit_async.py +++ b/sdk/cosmos/azure-cosmos/tests/test_routing_map_provider_unit_async.py @@ -9,14 +9,22 @@ import asyncio import unittest -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch import pytest from azure.cosmos.aio import CosmosClient # noqa: F401 - needed to resolve circular imports -from azure.cosmos._routing.aio.routing_map_provider import PartitionKeyRangeCache +from azure.cosmos._routing.aio.routing_map_provider import ( + PartitionKeyRangeCache, + _OVERLAP_RETRY_MAX_ATTEMPTS, +) from azure.cosmos._routing.collection_routing_map import CollectionRoutingMap +from azure.cosmos._routing._routing_map_provider_common import ( + process_fetched_ranges, + _IncrementalMergeFailed, +) from azure.cosmos import http_constants +from azure.cosmos.exceptions import CosmosHttpResponseError from azure.cosmos._gone_retry_policy_base import _PartitionKeyRangeGoneRetryPolicyBase @@ -489,6 +497,185 @@ async def async_gen(): self.assertEqual(ids, ['4', '5', '3', '1']) self.assertEqual(result.change_feed_etag, '"etag-old"') + # ========================================================================== + # End-to-end retry-loop tests for transient /pkranges snapshot inconsistency. + # + # These cover the integration between the builder (which converts the + # underlying ValueError("Ranges overlap") into the _OverlapDetected + # sentinel) and _fetch_routing_map (which catches the sentinel, sleeps + # briefly, and re-fetches). The customer-observed failure mode is that + # one paginated /pkranges response can be internally inconsistent (e.g. + # a stale parent appearing alongside its children with missing parent + # references); a small retry budget with backoff gives the gateway time + # to converge before the SDK surfaces a transient HTTP 503. + # ========================================================================== + + async def test_fetch_routing_map_recovers_after_transient_overlap_async(self): + """When the gateway returns an inconsistent paginated /pkranges snapshot + once and a consistent one on retry, the cache should populate cleanly + with the consistent data on the second attempt — the customer sees no + crash, no missing rows, just a brief stall.""" + # First call: Mode 2 payload (stale parent + children missing parent ref) → triggers _OverlapDetected. + bad_payload = [ + {'id': 'L', 'minInclusive': '', 'maxExclusive': '80'}, + {'id': '10', 'minInclusive': '80', 'maxExclusive': 'A0'}, # stale parent + {'id': '10/0', 'minInclusive': '80', 'maxExclusive': '90'}, # missing parents=['10'] + {'id': '10/1', 'minInclusive': '90', 'maxExclusive': 'A0'}, # missing parents=['10'] + {'id': 'R', 'minInclusive': 'A0', 'maxExclusive': 'FF'}, + ] + # Second call: same logical topology, but with the lineage metadata correctly + # propagated — gateway has now rotated to a consistent snapshot. + good_payload = [ + {'id': 'L', 'minInclusive': '', 'maxExclusive': '80'}, + {'id': '10/0', 'minInclusive': '80', 'maxExclusive': '90', 'parents': ['10']}, + {'id': '10/1', 'minInclusive': '90', 'maxExclusive': 'A0', 'parents': ['10']}, + {'id': 'R', 'minInclusive': 'A0', 'maxExclusive': 'FF'}, + ] + + responses = [bad_payload, good_payload] + call_count = {'n': 0} + + client = MagicMock() + + def fake_read_pk_ranges(collection_link, options, response_hook=None, **kwargs): + payload = responses[call_count['n']] if call_count['n'] < len(responses) else good_payload + call_count['n'] += 1 + headers = {http_constants.HttpHeaders.ETag: '"etag-{}"'.format(call_count['n'])} + if response_hook: + response_hook(headers, None) + capture_headers = kwargs.get('_internal_response_headers_capture') + if capture_headers is not None: + capture_headers.update(headers) + + async def async_gen(): + for r in payload: + yield r + + return async_gen() + + client._ReadPartitionKeyRanges = MagicMock(side_effect=fake_read_pk_ranges) + cache = PartitionKeyRangeCache(client) + + # Patch asyncio.sleep so the test does not actually wait the backoff. + async def _no_sleep(_seconds): + return None + + with patch( + 'azure.cosmos._routing.aio.routing_map_provider.asyncio.sleep', + new=_no_sleep, + ): + result = await cache.get_routing_map("dbs/db1/colls/coll1", feed_options={}) + + self.assertIsNotNone( + result, + "Cache should populate after the transient overlap clears on retry." + ) + self.assertEqual( + call_count['n'], 2, + "Expected exactly one retry: one failed fetch + one successful fetch." + ) + ids = [r['id'] for r in result._orderedPartitionKeyRanges] + # Post-fix expected ordering: L, 10/0, 10/1, R (the stale parent '10' + # is correctly filtered on the consistent retry payload). + self.assertEqual(ids, ['L', '10/0', '10/1', 'R']) + + async def test_fetch_routing_map_surfaces_503_after_persistent_overlap_async(self): + """If the gateway keeps returning inconsistent snapshots through every + retry attempt, the cache should NOT silently return empty results from + get_overlapping_ranges (which would be a correctness bug masquerading + as zero data). It must surface a typed transient HTTP error so the + upstream retry policy can decide what to do.""" + bad_payload = [ + {'id': 'L', 'minInclusive': '', 'maxExclusive': '80'}, + {'id': '10', 'minInclusive': '80', 'maxExclusive': 'A0'}, + {'id': '10/0', 'minInclusive': '80', 'maxExclusive': '90'}, + {'id': '10/1', 'minInclusive': '90', 'maxExclusive': 'A0'}, + {'id': 'R', 'minInclusive': 'A0', 'maxExclusive': 'FF'}, + ] + call_count = {'n': 0} + client = MagicMock() + + def fake_read_pk_ranges(collection_link, options, response_hook=None, **kwargs): + call_count['n'] += 1 + headers = {http_constants.HttpHeaders.ETag: '"etag-bad"'} + if response_hook: + response_hook(headers, None) + capture_headers = kwargs.get('_internal_response_headers_capture') + if capture_headers is not None: + capture_headers.update(headers) + + async def async_gen(): + for r in bad_payload: + yield r + + return async_gen() + + client._ReadPartitionKeyRanges = MagicMock(side_effect=fake_read_pk_ranges) + cache = PartitionKeyRangeCache(client) + + async def _no_sleep(_seconds): + return None + + with patch( + 'azure.cosmos._routing.aio.routing_map_provider.asyncio.sleep', + new=_no_sleep, + ): + with self.assertRaises(CosmosHttpResponseError) as ctx: + await cache.get_routing_map("dbs/db1/colls/coll1", feed_options={}) + + self.assertEqual( + ctx.exception.status_code, http_constants.StatusCodes.SERVICE_UNAVAILABLE, + "Persistent overlap must surface as HTTP 503 (transient), not as a bare ValueError " + "or as a silent empty-result return." + ) + # We should have exhausted the full retry budget (3 attempts by default). + self.assertEqual( + call_count['n'], _OVERLAP_RETRY_MAX_ATTEMPTS, + "Should have made exactly _OVERLAP_RETRY_MAX_ATTEMPTS fetch attempts before giving up." + ) + + async def test_incremental_overlap_converts_to_incremental_merge_failed_async(self): + """If the incremental-merge path produces overlapping ranges (e.g. the + delta contains a range whose key span overlaps an existing cached + range without either side declaring the other a parent), the + ``ValueError("Ranges overlap")`` raised by ``try_combine`` must NOT + escape to the caller. It must convert to ``_IncrementalMergeFailed`` + so the standard fallback path takes over (retry incremental once, + then full-load — which has its own ``_OverlapDetected`` handler). + This is what guarantees the customer never observes a bare + ``ValueError`` from any of the validator's call sites.""" + + # Existing cached map: '0' covers ['', '80'] and '1' covers ['80', 'FF']. + previous_map = CollectionRoutingMap.CompleteRoutingMap( + [ + ({'id': '0', 'minInclusive': '', 'maxExclusive': '80'}, True), + ({'id': '1', 'minInclusive': '80', 'maxExclusive': 'FF'}, True), + ], + 'coll1', '"etag-prev"' + ) + + # Delta: + # - '0' re-declared with the same span (resolves via the existing + # ``known_range_info_by_id`` lookup — no parents needed). + # - '2' with ``parents=['1']`` and a span that overlaps '0'. The + # parent-resolution loop succeeds because '1' is in the cache, + # so we reach ``try_combine``. Once '1' is removed as the gone + # parent, the merged map is { '0' ('', '80'), '2' ('40', 'FF') } + # — '0' overlaps '2' on ['40', '80'], so ``is_complete_set_of_range`` + # raises ``ValueError("Ranges overlap: ...")`` from inside + # ``try_combine``. + bad_delta = [ + {'id': '0', 'minInclusive': '', 'maxExclusive': '80'}, + {'id': '2', 'minInclusive': '40', 'maxExclusive': 'FF', 'parents': ['1']}, + ] + + # The wrapper around try_combine must absorb the ValueError and convert + # it to _IncrementalMergeFailed for the caller's retry loop. + with self.assertRaises(_IncrementalMergeFailed): + process_fetched_ranges( + bad_delta, previous_map, 'coll1', 'dbs/db1/colls/coll1', '"etag-new"' + ) + if __name__ == "__main__": unittest.main() From 15a1c68e9a30002d8ce45aef67c3c7afa50ed1ec Mon Sep 17 00:00:00 2001 From: dibahlfi <106994927+dibahlfi@users.noreply.github.com> Date: Fri, 22 May 2026 22:38:09 -0500 Subject: [PATCH 2/3] fixing copilot comments --- sdk/cosmos/azure-cosmos/CHANGELOG.md | 1 + .../_routing/_routing_map_provider_common.py | 166 ++++------ .../_routing/aio/routing_map_provider.py | 32 +- .../cosmos/_routing/collection_routing_map.py | 115 ++++--- .../cosmos/_routing/routing_map_provider.py | 38 ++- sdk/cosmos/azure-cosmos/cspell.json | 4 + .../routing/test_collection_routing_map.py | 106 +++--- .../tests/test_routing_map_provider_unit.py | 306 ++++++++++++++++-- .../test_routing_map_provider_unit_async.py | 149 +++++++-- 9 files changed, 649 insertions(+), 268 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index 872e6a9ee37c..039c62deb91c 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -10,6 +10,7 @@ #### Bugs Fixed * Fixed bug where `CosmosClient` construction with AAD credentials would crash at startup if the semantic reranking inference endpoint environment variable was not set, even when semantic reranking was not being used. The inference service is now lazily initialized on first use. See [PR 46243](https://github.com/Azure/azure-sdk-for-python/pull/46243) * Fixed bug where region names in `preferred_locations` and `excluded_locations` (client-level and per-request) were not matched tolerantly for differences in case, whitespace, hyphens, and underscores. See [PR 46937](https://github.com/Azure/azure-sdk-for-python/pull/46937) +* Fixed bug where a `ValueError("Ranges overlap")` or an `AssertionError("code bug: returned overlapping ranges ... is empty")` from the partition key range cache could escape to the caller when the `/pkranges` response contained a transiently inconsistent snapshot (overlap or gap). See [PR 47091](https://github.com/Azure/azure-sdk-for-python/pull/47091) #### Other Changes * Reduced per-client memory overhead when partition-level circuit breaker (PPCB) is enabled by sharing the partition key range routing map cache across CosmosClient instances connected to the same endpoint, and stripping unused fields from cached partition key ranges using compact PKRange namedtuples. See [PR 46297](https://github.com/Azure/azure-sdk-for-python/pull/46297) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/_routing_map_provider_common.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/_routing_map_provider_common.py index 806be8544695..02959216860a 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/_routing_map_provider_common.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/_routing_map_provider_common.py @@ -36,7 +36,8 @@ from .collection_routing_map import ( CollectionRoutingMap, _build_routing_map_from_ranges, - _OverlapDetected, # noqa: F401 # re-exported for sync/async provider modules and tests + _OverlapDetected, # noqa: F401 # re-exported for provider modules and tests + _GapDetected, # noqa: F401 # re-exported for provider modules and tests ) from . import routing_range from .routing_range import ( @@ -50,115 +51,99 @@ PAGE_SIZE_CHANGE_FEED = "-1" # Return all available changes -# Number of times the full-load path will re-fetch ``/pkranges`` when the -# builder reports an overlap (``_OverlapDetected``). Overlap on the full-load -# path is treated as a transient gateway inconsistency, so a small fixed -# retry budget with backoff is preferred over surfacing immediately. After -# this many attempts the caller surfaces a transient HTTP 503 so the -# upstream retry policy can take over. -# -# Defined here (rather than in each provider module) so the sync and async -# providers cannot drift on the retry budget — both import the same constant. -_OVERLAP_RETRY_MAX_ATTEMPTS = 3 -# Initial backoff between overlap retries; doubles each attempt. Worst-case -# total sleep under the budget above is ~3.5s (0.5 + 1.0 + 2.0). -_OVERLAP_RETRY_INITIAL_BACKOFF_SECONDS = 0.5 +# Maximum retry attempts for transient full-load /pkranges inconsistencies +# (overlap OR gap) before surfacing a transient HTTP 503. Centralised here so +# the sync and async providers share one source of truth. +_TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS = 3 +# Initial backoff between inconsistency retries; doubles each attempt. With +# ``_TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS = 3``, attempt 3 raises 503 before +# sleeping, so only the post-attempt-1 and post-attempt-2 backoffs are slept -- +# deterministic worst case is the sum of the first two schedule entries +# (``_TRANSIENT_SNAPSHOT_RETRY_INITIAL_BACKOFF_SECONDS`` and that value +# doubled); expected wall time is half of that under full jitter. +_TRANSIENT_SNAPSHOT_RETRY_INITIAL_BACKOFF_SECONDS = 0.5 def _jittered_backoff(backoff_seconds: float) -> float: """Return a uniformly-jittered backoff in the range ``[0, backoff_seconds]``. - Implements the "full jitter" strategy: the actual sleep is drawn uniformly - from zero to the full deterministic backoff. This decorrelates concurrent - retriers (for example, multiple Cosmos clients running inside a single - PySpark process that all hit the same gateway node on the same bad - ``/pkranges`` snapshot at the same instant) so they do not retry in - lockstep and re-collide on the same gateway node. + Implements the "full jitter" strategy so concurrent retriers in different + processes do not retry in lockstep against the same gateway node. - The worst-case sleep per attempt is unchanged (still bounded by the - deterministic backoff), so the documented retry-budget contract still - holds; the expected per-attempt sleep is half of it. + :param float backoff_seconds: Deterministic upper bound for the backoff, + in seconds. Must be non-negative. + :return: A uniformly-distributed sleep value in ``[0, backoff_seconds]``. + :rtype: float """ return random.uniform(0, backoff_seconds) -def _handle_overlap_retry_decision( +def _handle_transient_snapshot_retry_decision( *, - overlap_attempt_count: int, + retry_attempt_count: int, collection_link: str, logger: logging.Logger, # pylint: disable=redefined-outer-name ) -> float: - """Decide what to do after the full-load builder reported an overlap. - - Centralises the sync/async-identical retry policy. Returns the number of - seconds the caller should sleep before the next attempt. Raises - :class:`CosmosHttpResponseError` (HTTP 503) when the attempt budget has - been exhausted; the caller's existing retry policy then handles it as - a transient error. - - The returned sleep duration is jittered (see :func:`_jittered_backoff`) - so concurrent retriers do not retry in lockstep. The deterministic - backoff schedule (0.5s -> 1.0s -> 2.0s, doubling) defines the *upper - bound* of each attempt's sleep; the actual sleep is drawn uniformly - from ``[0, that upper bound]``. - - The caller is responsible for the actual sleep (sync ``time.sleep`` or - ``await asyncio.sleep``). Keeping the sleep at the call site is what - lets this helper stay free of concurrency-runtime assumptions — the - only line that has to differ between the sync and async providers. - - :param int overlap_attempt_count: Number of overlap attempts made so far, - including the one that just failed. Pass ``1`` after the first failure, - ``2`` after the second, etc. - :param str collection_link: Used in log messages and the 503 error body - so the caller knows which collection ran out of budget. - :param logging.Logger logger: Caller's module-level logger, so messages - appear under the right ``azure.cosmos._routing.*`` namespace. - :return: Jittered backoff seconds to sleep before retrying. Guaranteed - to be in ``[0, deterministic_backoff_for_attempt]``. + """Decide what to do after the full-load builder reported a transient + snapshot inconsistency (overlap or gap). + + Returns a jittered backoff for the caller to sleep before the next + attempt; raises :class:`CosmosHttpResponseError` (HTTP 503) once + ``_TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS`` is reached. Under the default + budget the final attempt raises 503 before sleeping, so only the first + ``_TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS - 1`` attempts produce a sleep + (deterministic upper bounds doubling from + ``_TRANSIENT_SNAPSHOT_RETRY_INITIAL_BACKOFF_SECONDS``). The caller + performs the actual sleep (``time.sleep`` vs ``await asyncio.sleep``), + which is the only line that differs between the sync and async + providers. + + :keyword int retry_attempt_count: Attempts so far, including the one + that just failed. Pass ``1`` after the first failure. + :keyword str collection_link: Used in log messages and the 503 body. + :keyword logging.Logger logger: Caller's module-level logger. + :return: Jittered backoff seconds in ``[0, deterministic_upper_bound]``. :rtype: float - :raises CosmosHttpResponseError: When ``overlap_attempt_count`` has reached - ``_OVERLAP_RETRY_MAX_ATTEMPTS``. Status code is 503 so the upstream - retry policy classifies it as transient. + :raises CosmosHttpResponseError: When the attempt budget is exhausted. """ - if overlap_attempt_count >= _OVERLAP_RETRY_MAX_ATTEMPTS: + if retry_attempt_count >= _TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS: logger.error( - "Full-load routing-map fetch for collection '%s' detected " - "overlapping partition key ranges on every one of %d attempt(s). " - "Surfacing as transient HTTP 503 so the caller's retry policy " - "can take over.", + "Full-load routing-map fetch for collection '%s' detected a " + "transient snapshot inconsistency (overlap or gap) on every " + "one of %d attempt(s). Surfacing as transient HTTP 503 so the " + "caller's retry policy can take over.", collection_link, - overlap_attempt_count, + retry_attempt_count, ) raise CosmosHttpResponseError( status_code=http_constants.StatusCodes.SERVICE_UNAVAILABLE, message=( - "Failed to build routing map for collection '{}': " - "overlapping partition key ranges persisted across {} " + "Failed to build routing map for collection '{}': transient " + "snapshot inconsistency (overlap or gap) persisted across {} " "full-load attempt(s). Surfaced as a retryable transient " - "error so the upstream retry policy can take over, rather " - "than allowing the underlying ValueError to escape as a " - "fatal crash." - ).format(collection_link, overlap_attempt_count), + "error so the upstream retry policy can take over." + ).format(collection_link, retry_attempt_count), ) deterministic_backoff = ( - _OVERLAP_RETRY_INITIAL_BACKOFF_SECONDS * (2 ** (overlap_attempt_count - 1)) + _TRANSIENT_SNAPSHOT_RETRY_INITIAL_BACKOFF_SECONDS * (2 ** (retry_attempt_count - 1)) ) jittered_backoff = _jittered_backoff(deterministic_backoff) logger.warning( - "Full-load routing-map fetch for collection '%s' detected overlapping " - "partition key ranges (attempt %d/%d). Sleeping %.2fs (jittered from " - "upper bound %.2fs) and retrying.", + "Full-load routing-map fetch for collection '%s' detected a transient " + "snapshot inconsistency (overlap or gap) (attempt %d/%d). Sleeping " + "%.2fs (jittered from upper bound %.2fs) and retrying.", collection_link, - overlap_attempt_count, - _OVERLAP_RETRY_MAX_ATTEMPTS, + retry_attempt_count, + _TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS, jittered_backoff, deterministic_backoff, ) return jittered_backoff + + def is_cache_unchanged_since_previous( collection_routing_map_by_item: Dict[str, CollectionRoutingMap], collection_id: str, @@ -263,7 +248,7 @@ def _resolve_endpoint(client: Any) -> str: class _IncrementalMergeFailed(Exception): - """Sentinel raised by :func:`process_fetched_ranges` when the + """Private exception type raised by :func:`process_fetched_ranges` when the incremental update cannot resolve all partition key ranges. The caller decides how to recover: retry the incremental fetch @@ -276,7 +261,7 @@ def process_fetched_ranges( collection_id: str, collection_link: str, new_etag: Optional[str], -) -> Optional[CollectionRoutingMap]: +) -> CollectionRoutingMap: """Turn raw PK-range results into a :class:`CollectionRoutingMap`. Handles both initial-load (when *previous_routing_map* is ``None``) @@ -291,10 +276,8 @@ def process_fetched_ranges( :param str collection_id: The ID of the collection. :param str collection_link: The link to the collection. :param str new_etag: The ETag from the change feed response, or ``None``. - :return: The new/updated routing map, or ``None`` when an - initial load yields no ranges. + :return: The new/updated routing map. :rtype: ~azure.cosmos._routing.collection_routing_map.CollectionRoutingMap - or None :raises _IncrementalMergeFailed: When the incremental path cannot resolve all ranges. The caller catches this and either retries the incremental fetch or falls back to a full refresh. @@ -374,22 +357,15 @@ def process_fetched_ranges( try: result = previous_routing_map.try_combine(range_tuples, effective_etag) except ValueError as overlap_error: - # ``try_combine`` validates the merged map via - # ``CollectionRoutingMap.is_complete_set_of_range`` and raises - # ``ValueError("Ranges overlap: ...")`` if the merge produces a - # self-contradictory tiling. This can happen during the incremental - # path when the delta contains a range whose key span overlaps an - # existing cached range without either side declaring the other a - # parent. - # - # We must NOT let this ``ValueError`` escape: the cache layer above - # treats a ``None`` routing map as "no ranges" and would convert - # the bare exception into a silent empty-result return at - # ``get_overlapping_ranges``. Convert to ``_IncrementalMergeFailed`` - # so the caller's existing retry loop retries the incremental fetch - # once and then falls back to the full-load path, which has its own - # ``_OverlapDetected`` handler with retry+backoff and surfaces a - # transient HTTP 503 if the inconsistency persists. + # Convert the overlap ``ValueError`` from ``try_combine`` into + # ``_IncrementalMergeFailed`` so the caller retries the incremental + # fetch and ultimately falls back to the full-load path (which has + # its own ``_OverlapDetected`` retry + 503 safety net). Narrow to + # the literal ``"Ranges overlap"`` prefix (kept stable in + # ``is_complete_set_of_range`` and pinned by the regression tests) + # so any future unrelated ``ValueError`` surfaces as a real bug. + if not str(overlap_error).startswith("Ranges overlap"): + raise logger.warning( "Incremental merge for collection '%s' produced overlapping ranges: %s. " "Converting to _IncrementalMergeFailed so the caller retries / " diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/aio/routing_map_provider.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/aio/routing_map_provider.py index 33129e95ae21..4d4c0ca84e80 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/aio/routing_map_provider.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/aio/routing_map_provider.py @@ -39,8 +39,8 @@ get_smart_overlapping_ranges, _IncrementalMergeFailed, _OverlapDetected, - _OVERLAP_RETRY_MAX_ATTEMPTS, # noqa: F401 # re-exported for tests - _handle_overlap_retry_decision, + _GapDetected, + _handle_transient_snapshot_retry_decision, ) @@ -107,9 +107,7 @@ # before falling back to a full routing-map refresh. _INCOMPLETE_ROUTING_MAP_MAX_RETRIES = 1 -# Overlap-retry budget and backoff live in ``_routing_map_provider_common`` so -# the sync and async providers cannot drift on them. ``_OVERLAP_RETRY_MAX_ATTEMPTS`` -# is re-exported through this module for test imports. + class PartitionKeyRangeCache(object): """ PartitionKeyRangeCache provides list of effective partition key ranges for a @@ -350,7 +348,7 @@ async def _fetch_routing_map( """ current_previous_map = previous_routing_map incomplete_attempt_count = 0 - overlap_attempt_count = 0 + inconsistency_attempt_count = 0 while True: request_kwargs = dict(kwargs) @@ -406,18 +404,16 @@ async def _fetch_routing_map( continue raise - except _OverlapDetected: - # The full-load builder reported overlapping ranges. Apply - # the retry-on-overlap policy: ``_handle_overlap_retry_decision`` - # either returns a backoff to sleep or raises ``CosmosHttpResponseError`` - # (503) when the attempt budget is exhausted. Reset - # ``current_previous_map`` to ``None`` so the next iteration - # runs the full-load path regardless of which path tripped - # the overlap — we do not want to keep retrying an incremental - # fetch against the same inconsistent base. - overlap_attempt_count += 1 - backoff = _handle_overlap_retry_decision( - overlap_attempt_count=overlap_attempt_count, + except (_OverlapDetected, _GapDetected): + # Reset ``current_previous_map`` to ``None`` so the next + # iteration runs the full-load path: we do not want to keep + # retrying an incremental fetch against the same inconsistent + # base. ``_handle_transient_snapshot_retry_decision`` returns + # the backoff or raises a 503 once the attempt budget is + # exhausted. + inconsistency_attempt_count += 1 + backoff = _handle_transient_snapshot_retry_decision( + retry_attempt_count=inconsistency_attempt_count, collection_link=collection_link, logger=logger, ) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/collection_routing_map.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/collection_routing_map.py index 555737822685..5d5bd1983106 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/collection_routing_map.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/collection_routing_map.py @@ -31,26 +31,30 @@ class _OverlapDetected(Exception): - """Sentinel raised by :func:`_build_routing_map_from_ranges` when the - ``/pkranges`` response contains overlapping ranges that the SDK cannot - reconcile from a single snapshot. - - Distinct from a plain ``ValueError`` so the caller can identify the - overlap case explicitly and apply the retry-on-overlap policy instead - of treating it as a fatal programmer error. Overlap on this path is - expected to be transient (a paginated ``/pkranges`` response that is - not snapshot-isolated across gateway nodes), so a short retry budget - with backoff is sufficient. - - The caller (``_fetch_routing_map`` on both sync and async cache - providers) catches this sentinel, sleeps briefly, and retries the - fetch. After a small number of failed retries it surfaces a typed - transient HTTP error so the upstream retry policy can take over. The - sentinel is intentionally not allowed to escape to the query layer - as a bare ``ValueError`` — that path would otherwise convert into a - silent empty-result return at ``get_overlapping_ranges`` (which - treats a ``None`` routing map as "no ranges"), masking the failure - as a correctness bug. + """Raised by :func:`_build_routing_map_from_ranges` to signal that the + gateway returned a ``/pkranges`` snapshot with overlapping ranges. + + Intentionally NOT a ``ValueError`` subclass: cache-layer code historically + catches ``ValueError`` broadly, so a plain ``ValueError`` would be + silently swallowed and surface later as an empty result from + ``get_overlapping_ranges``. Each provider's ``_fetch_routing_map`` + catches this type explicitly and applies the retry-on-overlap policy. + """ + + +class _GapDetected(Exception): + """Raised by :func:`_build_routing_map_from_ranges` to signal that the + gateway returned a ``/pkranges`` snapshot with a hole in the key space + (i.e. one range's upper bound is strictly less than the next range's + lower bound, leaving some keys uncovered). + + Same root cause as :class:`_OverlapDetected` -- a transient gateway + snapshot served mid-propagation -- but the opposite symptom. Today + this case escapes as ``AssertionError("code bug: returned overlapping + ranges ... is empty")`` from ``SmartRoutingMapProvider``'s generator + when the downstream query tries to use the empty result. Caught + alongside ``_OverlapDetected`` and given the same bounded retry + + typed HTTP 503 treatment so the upstream retry policy can absorb it. """ # pylint: disable=line-too-long @@ -305,15 +309,14 @@ def _build_routing_map_from_ranges( new_etag, collection_link: str, _logger -) -> Optional['CollectionRoutingMap']: +) -> 'CollectionRoutingMap': """Build a complete routing map from a full load of partition key ranges. Filters out parent (gone) ranges and validates that the remaining ranges - form a complete, gap-free partition key space. Returns None if the ranges - are incomplete (gap), and raises ``_OverlapDetected`` if the ranges - overlap — the caller is expected to retry the fetch on the overlap case, - since overlap on the full-load path is treated as a transient gateway - inconsistency rather than a permanent input error. + form a complete, gap-free partition key space. Raises ``_OverlapDetected`` + when the ranges overlap and ``_GapDetected`` when they have a gap; both + are transient gateway-snapshot inconsistencies the caller is expected to + retry. This is shared between the sync and async PartitionKeyRangeCache to avoid code duplication — the logic is purely synchronous. @@ -323,22 +326,22 @@ def _build_routing_map_from_ranges( :param str new_etag: The ETag from the change feed response. :param str collection_link: The collection link, used for log messages. :param logging.Logger _logger: Logger instance for error reporting. - :return: A complete CollectionRoutingMap, or None if the ranges are incomplete (gap). - :rtype: Optional[CollectionRoutingMap] + :return: A complete CollectionRoutingMap. + :rtype: CollectionRoutingMap :raises _OverlapDetected: If the ranges contain an overlap that could not be resolved from this single snapshot. The caller should retry the fetch; see :class:`_OverlapDetected` for the rationale. + :raises _GapDetected: If the ranges have a hole in the key space. The + caller should retry the fetch; see :class:`_GapDetected` for the + rationale. """ - # Dedup the input by id BEFORE parent filtering and validation. At high - # partition counts the /pkranges response is paginated, and pagination - # is not snapshot-isolated across gateway nodes — consecutive pages can - # legitimately return the same range id when the page boundary falls - # between two nodes with one-tick-out-of-sync caches. Without this dedup - # the duplicate would survive into the sortedRanges list inside - # CompleteRoutingMap and trip the overlap check on two identical entries. - # Last-write-wins is safe: duplicates describe the same logical range - # and any later occurrence carries the same id, min/max, and (when - # present) the more-complete metadata such as ``parents``. + # Dedup the input by id before validation. Paginated ``/pkranges`` + # responses can repeat the same range id across pages when consecutive + # pages are served from gateway nodes with slightly different cached + # views; without dedup the duplicate trips the overlap check on two + # identical entries. Last-write-wins is safe in practice; if duplicates + # carry asymmetric metadata, the overlap path is handled by the + # ``_OverlapDetected`` retry flow. deduped_by_id: dict = {} for r in ranges: deduped_by_id[r[PartitionKeyRange.Id]] = r @@ -362,13 +365,14 @@ def _build_routing_map_from_ranges( new_etag ) except ValueError as overlap_error: - # ``is_complete_set_of_range`` raises ``ValueError("Ranges overlap: ...")`` - # when the post-filter range list still contains an overlap. Convert - # it to ``_OverlapDetected`` so the caller can apply the retry-on- - # overlap policy. The bare ``ValueError`` must NOT escape to the - # cache layer: that path converts into a silent empty-result return - # at ``get_overlapping_ranges`` (which treats ``None`` from the - # cache as "no ranges"), masking the failure as a correctness bug. + # Convert the overlap ``ValueError`` raised by ``is_complete_set_of_range`` + # into ``_OverlapDetected`` so the caller can apply the retry-on-overlap + # policy. Narrow to the literal ``"Ranges overlap"`` prefix (kept stable + # in ``is_complete_set_of_range`` and pinned by the regression tests) + # so any future unrelated ``ValueError`` from ``CompleteRoutingMap`` + # surfaces as a real bug instead of being silently coerced into a retry. + if not str(overlap_error).startswith("Ranges overlap"): + raise _logger.warning( "Full load of routing map for collection '%s' detected overlapping " "partition key ranges: %s. Signalling caller to retry the " @@ -378,12 +382,21 @@ def _build_routing_map_from_ranges( raise _OverlapDetected() from overlap_error if not routing_map: - _logger.error( - "Full load of routing map for collection '%s' failed: " - "the service returned an incomplete set of partition key ranges. " - "This can happen due to a transient service issue or a split completing mid-fetch.", - collection_link + # ``CompleteRoutingMap`` returns None when ``is_complete_set_of_range`` + # returns False without raising -- the gap case (``prev.max < cur.min``) + # or an empty input list. Same root cause as the overlap raise above + # (transient inconsistent gateway snapshot), opposite symptom: a hole + # in the key space rather than a duplicated one. Raise ``_GapDetected`` + # so the caller applies the same bounded retry + 503-on-exhaustion + # treatment as overlap. Without this, today's behaviour is for the + # ``None`` to surface as ``AssertionError("code bug: returned + # overlapping ranges ... is empty")`` from ``SmartRoutingMapProvider``. + _logger.warning( + "Full load of routing map for collection '%s' returned an " + "incomplete set of partition key ranges (gap in key space). " + "Signalling caller to retry the /pkranges fetch.", + collection_link, ) - return None + raise _GapDetected() return routing_map diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/routing_map_provider.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/routing_map_provider.py index 8bf2c2e5951f..3b78cad9e2d4 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/routing_map_provider.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/routing_map_provider.py @@ -28,7 +28,11 @@ from typing import Dict, Any, Optional, List, TYPE_CHECKING from azure.core.utils import CaseInsensitiveDict from .. import _base, http_constants -from .collection_routing_map import CollectionRoutingMap +from .collection_routing_map import ( + CollectionRoutingMap, + _OverlapDetected, + _GapDetected, +) from ..exceptions import CosmosHttpResponseError from ._routing_map_provider_common import ( _resolve_endpoint, @@ -38,9 +42,7 @@ determine_refresh_action, get_smart_overlapping_ranges, _IncrementalMergeFailed, - _OverlapDetected, - _OVERLAP_RETRY_MAX_ATTEMPTS, # noqa: F401 # re-exported for tests - _handle_overlap_retry_decision, + _handle_transient_snapshot_retry_decision, ) if TYPE_CHECKING: @@ -98,9 +100,7 @@ # before falling back to a full routing-map refresh. _INCOMPLETE_ROUTING_MAP_MAX_RETRIES = 1 -# Overlap-retry budget and backoff live in ``_routing_map_provider_common`` so -# the sync and async providers cannot drift on them. ``_OVERLAP_RETRY_MAX_ATTEMPTS`` -# is re-exported through this module for test imports. + class PartitionKeyRangeCache(object): """ PartitionKeyRangeCache provides list of effective partition key ranges for a @@ -322,7 +322,7 @@ def _fetch_routing_map( """ current_previous_map = previous_routing_map incomplete_attempt_count = 0 - overlap_attempt_count = 0 + inconsistency_attempt_count = 0 while True: request_kwargs = dict(kwargs) @@ -377,18 +377,16 @@ def _fetch_routing_map( continue raise - except _OverlapDetected: - # The full-load builder reported overlapping ranges. Apply - # the retry-on-overlap policy: ``_handle_overlap_retry_decision`` - # either returns a backoff to sleep or raises ``CosmosHttpResponseError`` - # (503) when the attempt budget is exhausted. Reset - # ``current_previous_map`` to ``None`` so the next iteration - # runs the full-load path regardless of which path tripped - # the overlap — we do not want to keep retrying an incremental - # fetch against the same inconsistent base. - overlap_attempt_count += 1 - backoff = _handle_overlap_retry_decision( - overlap_attempt_count=overlap_attempt_count, + except (_OverlapDetected, _GapDetected): + # Reset ``current_previous_map`` to ``None`` so the next + # iteration runs the full-load path: we do not want to keep + # retrying an incremental fetch against the same inconsistent + # base. ``_handle_transient_snapshot_retry_decision`` returns + # the backoff or raises a 503 once the attempt budget is + # exhausted. + inconsistency_attempt_count += 1 + backoff = _handle_transient_snapshot_retry_decision( + retry_attempt_count=inconsistency_attempt_count, collection_link=collection_link, logger=logger, ) diff --git a/sdk/cosmos/azure-cosmos/cspell.json b/sdk/cosmos/azure-cosmos/cspell.json index d71f63bc08b7..62b0df0f579f 100644 --- a/sdk/cosmos/azure-cosmos/cspell.json +++ b/sdk/cosmos/azure-cosmos/cspell.json @@ -2,7 +2,11 @@ "ignoreWords": [ "hdrh", "hdrhistogram", + "dedup", + "deduped", + "deduping", "dedupe", + "dedups", "perfdb", "perfresults", "pkrange", diff --git a/sdk/cosmos/azure-cosmos/tests/routing/test_collection_routing_map.py b/sdk/cosmos/azure-cosmos/tests/routing/test_collection_routing_map.py index bfb6442d91ff..cdf86550b6a8 100644 --- a/sdk/cosmos/azure-cosmos/tests/routing/test_collection_routing_map.py +++ b/sdk/cosmos/azure-cosmos/tests/routing/test_collection_routing_map.py @@ -11,6 +11,7 @@ CollectionRoutingMap, _build_routing_map_from_ranges, _OverlapDetected, + _GapDetected, ) @@ -416,16 +417,23 @@ def test_build_routing_map_no_parents_passes_through_all(self): ids = [r['id'] for r in result._orderedPartitionKeyRanges] self.assertEqual(ids, ['0', '1']) - def test_build_routing_map_returns_none_for_incomplete_ranges(self): - """_build_routing_map_from_ranges returns None when the filtered ranges - don't form a complete partition key space (gap exists).""" + def test_build_routing_map_raises_gap_detected_for_incomplete_ranges(self): + """_build_routing_map_from_ranges raises ``_GapDetected`` when the + filtered ranges don't form a complete partition key space (gap exists). + The caller catches this and applies the same bounded-retry-then-503 + policy as ``_OverlapDetected``; without this raise the downstream + ``SmartRoutingMapProvider`` would crash with ``AssertionError("code + bug: returned overlapping ranges ... is empty")`` when the resulting + empty range list flows into its generator.""" _logger = logging.getLogger("test") ranges = [ {'id': '0', 'minInclusive': '', 'maxExclusive': '80'}, # Gap from '80' to 'FF' — incomplete ] - result = _build_routing_map_from_ranges(ranges, 'coll1', '"etag-4"', 'dbs/db/colls/coll1', _logger) - self.assertIsNone(result, "Should return None for incomplete range coverage") + with self.assertRaises(_GapDetected): + _build_routing_map_from_ranges( + ranges, 'coll1', '"etag-4"', 'dbs/db/colls/coll1', _logger + ) def test_build_routing_map_empty_parents_list_not_treated_as_gone(self): """_build_routing_map_from_ranges does NOT filter a range whose 'parents' @@ -454,33 +462,14 @@ def test_build_routing_map_stores_etag(self): self.assertIsNone(result_none_etag.change_feed_etag) # ========================================================================== - # Regression tests for ValueError("Ranges overlap") triggered by transient - # /pkranges snapshot inconsistencies at high partition counts. - # - # These tests reproduce three concrete failure modes observed against a - # customer container with ~25.6k physical partitions during a long-running - # async Spark scan. - # - # The desired behaviour at the builder layer is one of two outcomes per - # mode, never a bare ``ValueError`` escaping to the caller (which would - # otherwise reach ``get_overlapping_ranges`` where a ``None`` routing map - # silently returns an empty list — a correctness bug, not a crash): - # - # * Mode 1 (duplicate range across pages) — the builder must dedup by - # id BEFORE validation and succeed, since duplicates describe the - # same logical range. - # - # * Modes 2 and 3 (stale parent / surviving grandparent with broken - # intermediate references) -- the builder cannot reconstruct what is - # not in the data, so it must convert the unrecoverable-from-this- - # snapshot ``ValueError`` into a typed ``_OverlapDetected`` sentinel. - # The caller (`_fetch_routing_map`) catches the sentinel, retries - # after a brief backoff, and ultimately surfaces a typed transient - # HTTP 503 if the inconsistency persists across the retry budget. + # Regression tests for transient /pkranges snapshot inconsistencies. + # The builder must either succeed (after deduping duplicates by id) or + # raise ``_OverlapDetected`` — never let a bare ``ValueError`` reach + # ``get_overlapping_ranges``, which would silently return an empty list. # ========================================================================== def test_full_load_dedups_duplicate_range_id_across_pages_mode_1(self): - """Mode 1 (duplicate range across pages): when /pkranges pagination + """Duplicate range across pages: when /pkranges pagination returns the same range id on two consecutive pages because two gateway nodes serve from slightly different cached snapshots, the SDK should dedup by id (last-write-wins) before validating and produce a valid @@ -505,13 +494,13 @@ def test_full_load_dedups_duplicate_range_id_across_pages_mode_1(self): self.assertEqual(ids, ['0', '1', '2', '3']) def test_full_load_raises_overlap_sentinel_for_stale_parent_with_missing_child_refs_mode_2(self): - """Mode 2 (stale parent, children missing parent reference): when a + """Stale parent with children missing parent reference: when a gateway node returns a freshly-split parent alongside its children but the children's 'parents' fields fail to reference the parent (because the lineage metadata hadn't fully propagated when that node served the page), _build_routing_map_from_ranges should convert the underlying ValueError into _OverlapDetected so the caller can retry — and the - sentinel must NOT be a plain ValueError, since a plain ValueError + exception must NOT be a plain ValueError, since a plain ValueError would escape to the cache layer and silently return empty results from get_overlapping_ranges.""" _logger = logging.getLogger("test") @@ -529,7 +518,7 @@ def test_full_load_raises_overlap_sentinel_for_stale_parent_with_missing_child_r _build_routing_map_from_ranges(ranges, 'coll1', '"etag-stale-parent"', 'dbs/db/colls/coll1', _logger) def test_full_load_raises_overlap_sentinel_for_grandparent_surviving_cascade_split_mode_3(self): - """Mode 3 (grandparent surviving a cascade split): when two generations + """Grandparent surviving a cascade split: when two generations of splits have completed and the intermediate parent drops its reference to the grandparent, the grandparent survives every available defense and overlaps its grandchildren. _build_routing_map_from_ranges @@ -551,14 +540,45 @@ def test_full_load_raises_overlap_sentinel_for_grandparent_surviving_cascade_spl with self.assertRaises(_OverlapDetected): _build_routing_map_from_ranges(ranges, 'coll1', '"etag-cascade"', 'dbs/db/colls/coll1', _logger) + def test_full_load_raises_gap_detected_for_hole_in_key_space(self): + """Mirror of the overlap scenarios for the gap side: when one gateway + node has propagated the deletion of a parent but the children have + not yet appeared in its view, the response is missing the range that + used to cover the deleted parent's span. The builder must convert + this into ``_GapDetected`` so the caller applies the same bounded + retry. Without this, the empty ``get_overlapping_ranges`` result + crashes ``SmartRoutingMapProvider`` with ``AssertionError("code + bug: ...")``.""" + _logger = logging.getLogger("test") + # Parent '10' covered "80" -> "A0" and was just deleted; its + # children 10/0 and 10/1 have not yet propagated to this node's view. + ranges = [ + {'id': 'L', 'minInclusive': '', 'maxExclusive': '80'}, + {'id': 'R', 'minInclusive': 'A0', 'maxExclusive': 'FF'}, + # Hole "80" -> "A0" is unclaimed. + ] + with self.assertRaises(_GapDetected): + _build_routing_map_from_ranges(ranges, 'coll1', '"etag-gap"', 'dbs/db/colls/coll1', _logger) + + def test_gap_detected_is_not_a_value_error(self): + """Same invariant as for ``_OverlapDetected``: ``_GapDetected`` must + be distinguishable by type and must not be silently absorbed by any + ``except ValueError`` catch in the cache layer.""" + self.assertFalse( + issubclass(_GapDetected, ValueError), + "_GapDetected must not inherit from ValueError -- that would " + "allow legacy ValueError catches to absorb the retry signal." + ) + self.assertTrue(issubclass(_GapDetected, Exception)) + def test_overlap_sentinel_is_not_a_value_error(self): """``_OverlapDetected`` must not be a subclass of ``ValueError`` (or any other exception type that callers in the cache layer have historically caught and swallowed). If it were, the very catch sites that today let - the bug crash the scan would silently absorb the sentinel and convert + the bug crash the scan would silently absorb the signal and convert it into the same empty-result correctness bug we were trying to avoid. - The sentinel must be plainly an ``Exception``, distinguishable by + The exception must be plainly an ``Exception``, distinguishable by type, so the dedicated retry loop in ``_fetch_routing_map`` is the only handler.""" self.assertFalse( @@ -574,7 +594,16 @@ def test_overlap_error_message_identifies_offending_ranges(self): overlapping input (i.e. an unrecoverable programmer error rather than a transient gateway snapshot), the ValueError message should identify which two ranges overlapped so that whoever investigates the next - occurrence has actionable diagnostics out of the box.""" + occurrence has actionable diagnostics out of the box. + + Also pins the literal ``"Ranges overlap"`` prefix on the message. + The full-load guard in ``_build_routing_map_from_ranges`` and the + incremental-merge guard in ``process_fetched_ranges`` both rely on + ``str(err).startswith("Ranges overlap")`` to distinguish the + snapshot-inconsistency case from any unrelated future ``ValueError``. + If this prefix ever changes, those guards will silently start + re-raising what they were meant to convert, so the prefix is part + of the contract and gets asserted here.""" ranges = [ {'id': 'A', 'minInclusive': '', 'maxExclusive': '80'}, {'id': 'B', 'minInclusive': '40', 'maxExclusive': 'FF'}, # overlaps with A @@ -583,7 +612,12 @@ def test_overlap_error_message_identifies_offending_ranges(self): CollectionRoutingMap.is_complete_set_of_range(ranges) msg = str(ctx.exception) - self.assertIn('overlap', msg.lower()) + self.assertTrue( + msg.startswith("Ranges overlap"), + "Message must start with the literal 'Ranges overlap' prefix that " + "the production guards in _build_routing_map_from_ranges and " + "process_fetched_ranges match against. Got: {!r}".format(msg), + ) self.assertIn('A', msg, "Error message should name the previous (offending) range id.") self.assertIn('B', msg, "Error message should name the current (offending) range id.") self.assertIn('80', msg, "Error message should include the previous range's maxExclusive.") diff --git a/sdk/cosmos/azure-cosmos/tests/test_routing_map_provider_unit.py b/sdk/cosmos/azure-cosmos/tests/test_routing_map_provider_unit.py index 7d87879c74bc..cfcfb046cb0c 100644 --- a/sdk/cosmos/azure-cosmos/tests/test_routing_map_provider_unit.py +++ b/sdk/cosmos/azure-cosmos/tests/test_routing_map_provider_unit.py @@ -16,12 +16,13 @@ import pytest from azure.cosmos._routing._routing_map_provider_common import ( - _OVERLAP_RETRY_INITIAL_BACKOFF_SECONDS, - _handle_overlap_retry_decision, + _handle_transient_snapshot_retry_decision, + _TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS, + process_fetched_ranges, + _IncrementalMergeFailed, ) from azure.cosmos._routing.routing_map_provider import ( PartitionKeyRangeCache, - _OVERLAP_RETRY_MAX_ATTEMPTS, ) from azure.cosmos._routing.collection_routing_map import CollectionRoutingMap from azure.cosmos import http_constants @@ -296,9 +297,13 @@ def read_pk_ranges_empty(collection_link, options, response_hook=None, **kwargs) self.assertEqual(ids, ['0'], "Ranges should be preserved from previous map") self.assertEqual(result.change_feed_etag, '"etag-new"', "ETag should be updated") - def test_fetch_routing_map_empty_full_load_returns_none(self): - """_fetch_routing_map should return None when a full load (no previous - map) returns zero ranges — this means the service returned nothing.""" + def test_fetch_routing_map_empty_full_load_raises_503_after_budget(self): + """When a full load (no previous map) repeatedly returns zero ranges, + ``_fetch_routing_map`` should retry up to the overlap budget and then + surface ``CosmosHttpResponseError(status_code=503)`` rather than + returning ``None``. Empty ranges hit the same ``_GapDetected`` path as + a gap in the key space; without retry-then-503 the empty result + would later crash ``SmartRoutingMapProvider`` with ``AssertionError``.""" client = MagicMock() def read_pk_ranges_empty(collection_link, options, response_hook=None, **kwargs): @@ -310,14 +315,14 @@ def read_pk_ranges_empty(collection_link, options, response_hook=None, **kwargs) cache = PartitionKeyRangeCache(client) - result = cache._fetch_routing_map( - collection_link="dbs/db1/colls/coll1", - collection_id="dbs/db1/colls/coll1", - previous_routing_map=None, # Full load - feed_options={} - ) - - self.assertIsNone(result, "Full load with empty ranges should return None") + with self.assertRaises(CosmosHttpResponseError) as ctx: + cache._fetch_routing_map( + collection_link="dbs/db1/colls/coll1", + collection_id="dbs/db1/colls/coll1", + previous_routing_map=None, # Full load + feed_options={} + ) + self.assertEqual(ctx.exception.status_code, 503) def test_get_previous_routing_map_exact_key_finds_entry(self): @@ -653,13 +658,13 @@ def test_overlap_retry_backoff_is_within_deterministic_upper_bound(self): # Build a fresh logger so we don't compete for handlers with the real one. test_logger = logging.getLogger(__name__ + ".jitter_bounds_test") - # _OVERLAP_RETRY_MAX_ATTEMPTS is 3, so non-terminal attempts are 1 and 2. + # _TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS is 3, so non-terminal attempts are 1 and 2. # (Attempt 3 raises 503; that branch is exercised by the e2e test below.) for attempt_index, expected_upper_bound in [(1, 0.5), (2, 1.0)]: for _ in range(50): # 50 draws per attempt -- catches an # accidentally-constant return value as well. - backoff = _handle_overlap_retry_decision( - overlap_attempt_count=attempt_index, + backoff = _handle_transient_snapshot_retry_decision( + retry_attempt_count=attempt_index, collection_link="dbs/db1/colls/coll1", logger=test_logger, ) @@ -673,7 +678,7 @@ def test_overlap_retry_backoff_is_within_deterministic_upper_bound(self): "Jittered backoff for attempt {} must not exceed the " "deterministic upper bound {}s; got {}s. This would " "violate the worst-case wall-time contract documented " - "on _handle_overlap_retry_decision.".format( + "on _handle_transient_snapshot_retry_decision.".format( attempt_index, expected_upper_bound, backoff, ) ) @@ -691,8 +696,8 @@ def test_overlap_retry_backoff_actually_varies_between_calls(self): test fires loudly.""" test_logger = logging.getLogger(__name__ + ".jitter_variance_test") samples = [ - _handle_overlap_retry_decision( - overlap_attempt_count=1, + _handle_transient_snapshot_retry_decision( + retry_attempt_count=1, collection_link="dbs/db1/colls/coll1", logger=test_logger, ) @@ -706,7 +711,7 @@ def test_overlap_retry_backoff_actually_varies_between_calls(self): ) def test_overlap_retry_raises_503_at_attempt_budget_exhaustion(self): - """At the documented attempt budget (_OVERLAP_RETRY_MAX_ATTEMPTS), + """At the documented attempt budget (_TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS), the helper must raise CosmosHttpResponseError(503) -- not return a backoff. This is the only branch that surfaces an exception to the caller, and it is what lets the upstream Cosmos retry policy take @@ -717,8 +722,8 @@ def test_overlap_retry_raises_503_at_attempt_budget_exhaustion(self): but this is the focused unit-level guard.""" test_logger = logging.getLogger(__name__ + ".jitter_budget_test") with self.assertRaises(CosmosHttpResponseError) as ctx: - _handle_overlap_retry_decision( - overlap_attempt_count=_OVERLAP_RETRY_MAX_ATTEMPTS, + _handle_transient_snapshot_retry_decision( + retry_attempt_count=_TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS, collection_link="dbs/db1/colls/coll1", logger=test_logger, ) @@ -736,7 +741,7 @@ def test_fetch_routing_map_recovers_after_transient_overlap(self): once and a consistent one on retry, the sync cache should populate cleanly on the second attempt — the customer sees no crash, no missing rows, just a brief stall.""" - # First call: Mode 2 payload (stale parent + children missing parent ref). + # First call: stale parent + children missing parent reference. bad_payload = [ {'id': 'L', 'minInclusive': '', 'maxExclusive': '80'}, {'id': '10', 'minInclusive': '80', 'maxExclusive': 'A0'}, # stale parent @@ -825,8 +830,257 @@ def fake_read_pk_ranges(collection_link, options, response_hook=None, **kwargs): "or as a silent empty-result return." ) self.assertEqual( - call_count['n'], _OVERLAP_RETRY_MAX_ATTEMPTS, - "Should have made exactly _OVERLAP_RETRY_MAX_ATTEMPTS fetch attempts before giving up." + call_count['n'], _TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS, + "Should have made exactly _TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS fetch attempts before giving up." + ) + + def test_fetch_routing_map_recovers_after_transient_gap(self): + """Mirror of the overlap-recovery test for the gap side: when the + gateway returns a snapshot with a hole in the key space once and a + consistent one on retry, the sync cache should populate cleanly on + the second attempt rather than letting the empty result reach + ``SmartRoutingMapProvider`` (which would crash with + ``AssertionError``).""" + # First call: gap between "80" and "A0" (parent removed, children not yet visible). + bad_payload = [ + {'id': 'L', 'minInclusive': '', 'maxExclusive': '80'}, + {'id': 'R', 'minInclusive': 'A0', 'maxExclusive': 'FF'}, + ] + # Second call: gap is gone, children have propagated. + good_payload = [ + {'id': 'L', 'minInclusive': '', 'maxExclusive': '80'}, + {'id': '10/0', 'minInclusive': '80', 'maxExclusive': '90'}, + {'id': '10/1', 'minInclusive': '90', 'maxExclusive': 'A0'}, + {'id': 'R', 'minInclusive': 'A0', 'maxExclusive': 'FF'}, + ] + + responses = [bad_payload, good_payload] + call_count = {'n': 0} + + client = MagicMock() + + def fake_read_pk_ranges(collection_link, options, response_hook=None, **kwargs): + payload = responses[call_count['n']] if call_count['n'] < len(responses) else good_payload + call_count['n'] += 1 + headers = {http_constants.HttpHeaders.ETag: '"etag-{}"'.format(call_count['n'])} + if response_hook: + response_hook(headers, None) + capture_headers = kwargs.get('_internal_response_headers_capture') + if capture_headers is not None: + capture_headers.update(headers) + return iter(payload) + + client._ReadPartitionKeyRanges = MagicMock(side_effect=fake_read_pk_ranges) + cache = PartitionKeyRangeCache(client) + + with patch('azure.cosmos._routing.routing_map_provider.time.sleep', return_value=None): + result = cache.get_routing_map("dbs/db1/colls/coll1", feed_options={}) + + self.assertIsNotNone( + result, + "Sync cache should populate after the transient gap clears on retry." + ) + self.assertEqual(call_count['n'], 2, "Expected exactly one retry.") + ids = [r['id'] for r in result._orderedPartitionKeyRanges] + self.assertEqual(ids, ['L', '10/0', '10/1', 'R']) + + def test_fetch_routing_map_surfaces_503_after_persistent_gap(self): + """Mirror of the overlap-503 test for the gap side: a persistent gap + across the retry budget must surface as ``CosmosHttpResponseError(503)`` + rather than as an ``AssertionError("code bug: ...")`` from + ``SmartRoutingMapProvider``.""" + bad_payload = [ + {'id': 'L', 'minInclusive': '', 'maxExclusive': '80'}, + {'id': 'R', 'minInclusive': 'A0', 'maxExclusive': 'FF'}, + ] + call_count = {'n': 0} + client = MagicMock() + + def fake_read_pk_ranges(collection_link, options, response_hook=None, **kwargs): + call_count['n'] += 1 + headers = {http_constants.HttpHeaders.ETag: '"etag-bad"'} + if response_hook: + response_hook(headers, None) + capture_headers = kwargs.get('_internal_response_headers_capture') + if capture_headers is not None: + capture_headers.update(headers) + return iter(bad_payload) + + client._ReadPartitionKeyRanges = MagicMock(side_effect=fake_read_pk_ranges) + cache = PartitionKeyRangeCache(client) + + with patch('azure.cosmos._routing.routing_map_provider.time.sleep', return_value=None): + with self.assertRaises(CosmosHttpResponseError) as ctx: + cache.get_routing_map("dbs/db1/colls/coll1", feed_options={}) + + self.assertEqual(ctx.exception.status_code, http_constants.StatusCodes.SERVICE_UNAVAILABLE) + self.assertEqual( + call_count['n'], _TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS, + "Should have made exactly _TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS fetch attempts before giving up." + ) + + def test_incremental_overlap_converts_to_incremental_merge_failed(self): + """Sync parity with + ``test_incremental_overlap_converts_to_incremental_merge_failed_async``: + + If the incremental-merge path produces overlapping ranges (e.g. the + delta contains a range whose key span overlaps an existing cached + range without either side declaring the other a parent), the + ``ValueError("Ranges overlap")`` raised by ``try_combine`` must NOT + escape to the caller. It must convert to ``_IncrementalMergeFailed`` + so the standard fallback path takes over (retry incremental once, + then full-load -- which has its own ``_OverlapDetected`` handler). + This is what guarantees the customer never observes a bare + ``ValueError`` from any of the validator's call sites.""" + + # Existing cached map: '0' covers ['', '80'] and '1' covers ['80', 'FF']. + previous_map = CollectionRoutingMap.CompleteRoutingMap( + [ + ({'id': '0', 'minInclusive': '', 'maxExclusive': '80'}, True), + ({'id': '1', 'minInclusive': '80', 'maxExclusive': 'FF'}, True), + ], + 'coll1', '"etag-prev"' + ) + + # Delta: + # - '0' re-declared with the same span (resolves via the existing + # ``known_range_info_by_id`` lookup -- no parents needed). + # - '2' with ``parents=['1']`` and a span that overlaps '0'. The + # parent-resolution loop succeeds because '1' is in the cache, + # so we reach ``try_combine``. Once '1' is removed as the gone + # parent, the merged map is { '0' ('', '80'), '2' ('40', 'FF') } + # -- '0' overlaps '2' on ['40', '80'], so ``is_complete_set_of_range`` + # raises ``ValueError("Ranges overlap: ...")`` from inside + # ``try_combine``. + bad_delta = [ + {'id': '0', 'minInclusive': '', 'maxExclusive': '80'}, + {'id': '2', 'minInclusive': '40', 'maxExclusive': 'FF', 'parents': ['1']}, + ] + + # The wrapper around try_combine must absorb the ValueError and convert + # it to _IncrementalMergeFailed for the caller's retry loop. + with self.assertRaises(_IncrementalMergeFailed): + process_fetched_ranges( + bad_delta, previous_map, 'coll1', 'dbs/db1/colls/coll1', '"etag-new"' + ) + + def test_fetch_routing_map_mixed_overlap_and_gap_signals_share_retry_budget(self): + """The transient-snapshot retry budget is a single counter shared by + BOTH ``_OverlapDetected`` and ``_GapDetected`` signals -- it is not + per-signal-type. If the gateway alternates between overlap snapshots + and gap snapshots across attempts, the SDK must still surface a 503 + after the same ``_TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS`` budget. + + Without this guarantee an alternating gateway could starve the + retry policy indefinitely (overlap, gap, overlap, gap, ...), masking + a real partition-routing problem as a slow stall.""" + # Overlap payload: stale parent '10' coexists with its children that + # lack a ``parents`` reference. Triggers ``_OverlapDetected``. + overlap_payload = [ + {'id': 'L', 'minInclusive': '', 'maxExclusive': '80'}, + {'id': '10', 'minInclusive': '80', 'maxExclusive': 'A0'}, + {'id': '10/0', 'minInclusive': '80', 'maxExclusive': '90'}, + {'id': '10/1', 'minInclusive': '90', 'maxExclusive': 'A0'}, + {'id': 'R', 'minInclusive': 'A0', 'maxExclusive': 'FF'}, + ] + # Gap payload: ['80', 'A0') is missing entirely. Triggers ``_GapDetected``. + gap_payload = [ + {'id': 'L', 'minInclusive': '', 'maxExclusive': '80'}, + {'id': 'R', 'minInclusive': 'A0', 'maxExclusive': 'FF'}, + ] + + responses = [overlap_payload, gap_payload, overlap_payload] + call_count = {'n': 0} + + client = MagicMock() + + def fake_read_pk_ranges(collection_link, options, response_hook=None, **kwargs): + payload = responses[call_count['n']] if call_count['n'] < len(responses) else overlap_payload + call_count['n'] += 1 + headers = {http_constants.HttpHeaders.ETag: '"etag-mixed-{}"'.format(call_count['n'])} + if response_hook: + response_hook(headers, None) + capture_headers = kwargs.get('_internal_response_headers_capture') + if capture_headers is not None: + capture_headers.update(headers) + return iter(payload) + + client._ReadPartitionKeyRanges = MagicMock(side_effect=fake_read_pk_ranges) + cache = PartitionKeyRangeCache(client) + + with patch('azure.cosmos._routing.routing_map_provider.time.sleep', return_value=None): + with self.assertRaises(CosmosHttpResponseError) as ctx: + cache.get_routing_map("dbs/db1/colls/coll1", feed_options={}) + + self.assertEqual( + ctx.exception.status_code, http_constants.StatusCodes.SERVICE_UNAVAILABLE, + "Alternating overlap/gap signals must still surface as HTTP 503 once " + "the shared budget is exhausted." + ) + self.assertEqual( + call_count['n'], _TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS, + "Overlap and gap signals must share one retry budget; alternating " + "between them must NOT extend the total number of attempts." + ) + + def test_fetch_routing_map_preserves_existing_cache_entry_when_force_refresh_surfaces_503(self): + """A 503 raised by ``_fetch_routing_map`` during a forced refresh must + NOT corrupt the existing cached routing map. Customers commonly chain + ``force_refresh=True`` after a 410-Gone retry: if the refresh itself + fails transiently we want subsequent reads to keep returning the + previously-cached map (a slightly stale answer is far better than a + cache wiped out by a transient gateway hiccup, which would make + every future query pay the full reload cost).""" + # Pre-populate the shared cache with a known-good routing map. + cached_map = _make_complete_routing_map("dbs/db1/colls/coll1", '"etag-cached"') + cache = PartitionKeyRangeCache(MagicMock()) + cache._collection_routing_map_by_item["dbs/db1/colls/coll1"] = cached_map + + # Wire the client to return an inconsistent (overlap) snapshot every + # time -- forces the retry loop to exhaust its budget and raise 503. + bad_payload = [ + {'id': 'L', 'minInclusive': '', 'maxExclusive': '80'}, + {'id': '10', 'minInclusive': '80', 'maxExclusive': 'A0'}, + {'id': '10/0', 'minInclusive': '80', 'maxExclusive': '90'}, + {'id': '10/1', 'minInclusive': '90', 'maxExclusive': 'A0'}, + {'id': 'R', 'minInclusive': 'A0', 'maxExclusive': 'FF'}, + ] + + def fake_read_pk_ranges(collection_link, options, response_hook=None, **kwargs): + headers = {http_constants.HttpHeaders.ETag: '"etag-bad"'} + if response_hook: + response_hook(headers, None) + capture_headers = kwargs.get('_internal_response_headers_capture') + if capture_headers is not None: + capture_headers.update(headers) + return iter(bad_payload) + + cache._document_client._ReadPartitionKeyRanges = MagicMock(side_effect=fake_read_pk_ranges) + + with patch('azure.cosmos._routing.routing_map_provider.time.sleep', return_value=None): + with self.assertRaises(CosmosHttpResponseError) as ctx: + cache.get_routing_map( + "dbs/db1/colls/coll1", + feed_options={}, + force_refresh=True, + previous_routing_map=cached_map, + ) + + self.assertEqual(ctx.exception.status_code, http_constants.StatusCodes.SERVICE_UNAVAILABLE) + + # Critical invariant: the previously-cached map must still be reachable + # via the same key. A 503 from a forced refresh must never evict good + # cache state -- otherwise every transient gateway blip would force the + # next reader to pay a cold-start cost. + self.assertIs( + cache._collection_routing_map_by_item.get("dbs/db1/colls/coll1"), cached_map, + "Cached routing map must be preserved after a 503 from forced refresh -- " + "transient inconsistencies must not evict good cache state." + ) + self.assertEqual( + cache._collection_routing_map_by_item["dbs/db1/colls/coll1"].change_feed_etag, + '"etag-cached"', + "Cached ETag must remain the pre-503 value (no partial overwrite)." ) diff --git a/sdk/cosmos/azure-cosmos/tests/test_routing_map_provider_unit_async.py b/sdk/cosmos/azure-cosmos/tests/test_routing_map_provider_unit_async.py index 7a9465a42baf..46c048baad2a 100644 --- a/sdk/cosmos/azure-cosmos/tests/test_routing_map_provider_unit_async.py +++ b/sdk/cosmos/azure-cosmos/tests/test_routing_map_provider_unit_async.py @@ -13,15 +13,14 @@ import pytest -from azure.cosmos.aio import CosmosClient # noqa: F401 - needed to resolve circular imports from azure.cosmos._routing.aio.routing_map_provider import ( PartitionKeyRangeCache, - _OVERLAP_RETRY_MAX_ATTEMPTS, ) from azure.cosmos._routing.collection_routing_map import CollectionRoutingMap from azure.cosmos._routing._routing_map_provider_common import ( process_fetched_ranges, _IncrementalMergeFailed, + _TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS, ) from azure.cosmos import http_constants from azure.cosmos.exceptions import CosmosHttpResponseError @@ -220,21 +219,25 @@ async def test_fetch_routing_map_empty_incremental_response_async(self): self.assertEqual(ids, ['0'], "Ranges should be preserved from previous map") self.assertEqual(result.change_feed_etag, '"etag-new"', "ETag should be updated") - async def test_fetch_routing_map_empty_full_load_returns_none_async(self): - """_fetch_routing_map should return None when a full load (no previous - map) returns zero ranges — this means the service returned nothing.""" + async def test_fetch_routing_map_empty_full_load_raises_503_after_budget_async(self): + """When a full load (no previous map) repeatedly returns zero ranges, + ``_fetch_routing_map`` should retry up to the overlap budget and then + surface ``CosmosHttpResponseError(status_code=503)`` rather than + returning ``None``. Empty ranges hit the same ``_GapDetected`` path as + a gap in the key space; without retry-then-503 the empty result + would later crash ``SmartRoutingMapProvider`` with ``AssertionError``.""" client = _make_mock_async_client(ranges=[], response_etag='"etag"') cache = PartitionKeyRangeCache(client) - result = await cache._fetch_routing_map( - collection_link="dbs/db1/colls/coll1", - collection_id="dbs/db1/colls/coll1", - previous_routing_map=None, - feed_options={} - ) - - self.assertIsNone(result, "Full load with empty ranges should return None") + with self.assertRaises(CosmosHttpResponseError) as ctx: + await cache._fetch_routing_map( + collection_link="dbs/db1/colls/coll1", + collection_id="dbs/db1/colls/coll1", + previous_routing_map=None, + feed_options={} + ) + self.assertEqual(ctx.exception.status_code, 503) async def test_get_previous_routing_map_exact_key_finds_entry_async(self): @@ -502,12 +505,12 @@ async def async_gen(): # # These cover the integration between the builder (which converts the # underlying ValueError("Ranges overlap") into the _OverlapDetected - # sentinel) and _fetch_routing_map (which catches the sentinel, sleeps - # briefly, and re-fetches). The customer-observed failure mode is that - # one paginated /pkranges response can be internally inconsistent (e.g. - # a stale parent appearing alongside its children with missing parent - # references); a small retry budget with backoff gives the gateway time - # to converge before the SDK surfaces a transient HTTP 503. + # signal exception) and _fetch_routing_map (which catches the signal, + # sleeps briefly, and re-fetches). The customer-observed failure scenario + # is that one paginated /pkranges response can be internally inconsistent + # (e.g. a stale parent appearing alongside its children with missing + # parent references); a small retry budget with backoff gives the gateway + # time to converge before the SDK surfaces a transient HTTP 503. # ========================================================================== async def test_fetch_routing_map_recovers_after_transient_overlap_async(self): @@ -515,7 +518,7 @@ async def test_fetch_routing_map_recovers_after_transient_overlap_async(self): once and a consistent one on retry, the cache should populate cleanly with the consistent data on the second attempt — the customer sees no crash, no missing rows, just a brief stall.""" - # First call: Mode 2 payload (stale parent + children missing parent ref) → triggers _OverlapDetected. + # First call: stale parent + children missing parent reference → triggers _OverlapDetected. bad_payload = [ {'id': 'L', 'minInclusive': '', 'maxExclusive': '80'}, {'id': '10', 'minInclusive': '80', 'maxExclusive': 'A0'}, # stale parent @@ -630,9 +633,111 @@ async def _no_sleep(_seconds): ) # We should have exhausted the full retry budget (3 attempts by default). self.assertEqual( - call_count['n'], _OVERLAP_RETRY_MAX_ATTEMPTS, - "Should have made exactly _OVERLAP_RETRY_MAX_ATTEMPTS fetch attempts before giving up." + call_count['n'], _TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS, + "Should have made exactly _TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS fetch attempts before giving up." + ) + + async def test_fetch_routing_map_recovers_after_transient_gap_async(self): + """Mirror of the overlap-recovery test for the gap side: when the + gateway returns a snapshot with a hole in the key space once and a + consistent one on retry, the async cache should populate cleanly on + the second attempt rather than letting the empty result reach + ``SmartRoutingMapProvider`` (which would crash with + ``AssertionError``).""" + bad_payload = [ + {'id': 'L', 'minInclusive': '', 'maxExclusive': '80'}, + {'id': 'R', 'minInclusive': 'A0', 'maxExclusive': 'FF'}, + ] + good_payload = [ + {'id': 'L', 'minInclusive': '', 'maxExclusive': '80'}, + {'id': '10/0', 'minInclusive': '80', 'maxExclusive': '90'}, + {'id': '10/1', 'minInclusive': '90', 'maxExclusive': 'A0'}, + {'id': 'R', 'minInclusive': 'A0', 'maxExclusive': 'FF'}, + ] + + responses = [bad_payload, good_payload] + call_count = {'n': 0} + + client = MagicMock() + + def fake_read_pk_ranges(collection_link, options, response_hook=None, **kwargs): + payload = responses[call_count['n']] if call_count['n'] < len(responses) else good_payload + call_count['n'] += 1 + headers = {http_constants.HttpHeaders.ETag: '"etag-{}"'.format(call_count['n'])} + if response_hook: + response_hook(headers, None) + capture_headers = kwargs.get('_internal_response_headers_capture') + if capture_headers is not None: + capture_headers.update(headers) + + async def async_gen(): + for r in payload: + yield r + + return async_gen() + + client._ReadPartitionKeyRanges = MagicMock(side_effect=fake_read_pk_ranges) + cache = PartitionKeyRangeCache(client) + + async def _no_sleep(_seconds): + return None + + with patch( + 'azure.cosmos._routing.aio.routing_map_provider.asyncio.sleep', + new=_no_sleep, + ): + result = await cache.get_routing_map("dbs/db1/colls/coll1", feed_options={}) + + self.assertIsNotNone( + result, + "Async cache should populate after the transient gap clears on retry." ) + self.assertEqual(call_count['n'], 2, "Expected exactly one retry.") + ids = [r['id'] for r in result._orderedPartitionKeyRanges] + self.assertEqual(ids, ['L', '10/0', '10/1', 'R']) + + async def test_fetch_routing_map_surfaces_503_after_persistent_gap_async(self): + """Mirror of the overlap-503 test for the gap side: a persistent gap + across the retry budget must surface as ``CosmosHttpResponseError(503)`` + rather than as an ``AssertionError("code bug: ...")`` from + ``SmartRoutingMapProvider``.""" + bad_payload = [ + {'id': 'L', 'minInclusive': '', 'maxExclusive': '80'}, + {'id': 'R', 'minInclusive': 'A0', 'maxExclusive': 'FF'}, + ] + call_count = {'n': 0} + client = MagicMock() + + def fake_read_pk_ranges(collection_link, options, response_hook=None, **kwargs): + call_count['n'] += 1 + headers = {http_constants.HttpHeaders.ETag: '"etag-bad"'} + if response_hook: + response_hook(headers, None) + capture_headers = kwargs.get('_internal_response_headers_capture') + if capture_headers is not None: + capture_headers.update(headers) + + async def async_gen(): + for r in bad_payload: + yield r + + return async_gen() + + client._ReadPartitionKeyRanges = MagicMock(side_effect=fake_read_pk_ranges) + cache = PartitionKeyRangeCache(client) + + async def _no_sleep(_seconds): + return None + + with patch( + 'azure.cosmos._routing.aio.routing_map_provider.asyncio.sleep', + new=_no_sleep, + ): + with self.assertRaises(CosmosHttpResponseError) as ctx: + await cache.get_routing_map("dbs/db1/colls/coll1", feed_options={}) + + self.assertEqual(ctx.exception.status_code, http_constants.StatusCodes.SERVICE_UNAVAILABLE) + self.assertEqual(call_count['n'], _TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS) async def test_incremental_overlap_converts_to_incremental_merge_failed_async(self): """If the incremental-merge path produces overlapping ranges (e.g. the From d439dad20f6603f920987dad447430acb6c1ff3f Mon Sep 17 00:00:00 2001 From: dibahlfi <106994927+dibahlfi@users.noreply.github.com> Date: Fri, 22 May 2026 23:50:43 -0500 Subject: [PATCH 3/3] fixing tests --- .../_routing/_routing_map_provider_common.py | 11 +- .../_routing/aio/routing_map_provider.py | 28 ++- .../cosmos/_routing/routing_map_provider.py | 27 ++- .../routing/test_routing_map_provider.py | 28 ++- .../test_routing_map_provider_async.py | 31 +++- .../tests/test_container_rid_header_unit.py | 30 +-- .../test_container_rid_header_unit_async.py | 33 ++-- .../tests/test_partition_split_query.py | 29 ++- .../tests/test_partition_split_query_async.py | 37 ++-- .../tests/test_routing_map_provider_unit.py | 33 ++-- .../test_routing_map_provider_unit_async.py | 172 +++++++++++++++++- 11 files changed, 333 insertions(+), 126 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/_routing_map_provider_common.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/_routing_map_provider_common.py index 02959216860a..232d1c8fc288 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/_routing_map_provider_common.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/_routing_map_provider_common.py @@ -33,11 +33,16 @@ from .. import _base, http_constants from ..exceptions import CosmosHttpResponseError -from .collection_routing_map import ( +# ``_OverlapDetected`` and ``_GapDetected`` are imported (but not referenced +# inside this module) so that provider modules and tests can import them from +# this single module instead of reaching into ``collection_routing_map`` +# directly. Pylint reports ``unused-import`` on the ``from`` line as a whole +# (not on the individual names), so the disable must live on that line. +from .collection_routing_map import ( # pylint: disable=unused-import CollectionRoutingMap, _build_routing_map_from_ranges, - _OverlapDetected, # noqa: F401 # re-exported for provider modules and tests - _GapDetected, # noqa: F401 # re-exported for provider modules and tests + _OverlapDetected, + _GapDetected, ) from . import routing_range from .routing_range import ( diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/aio/routing_map_provider.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/aio/routing_map_provider.py index 4d4c0ca84e80..1604b4734b48 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/aio/routing_map_provider.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/aio/routing_map_provider.py @@ -312,9 +312,12 @@ async def get_routing_map( **kwargs ) - # Update the cache. - if new_routing_map: - self._collection_routing_map_by_item[collection_id] = new_routing_map + # ``_fetch_routing_map`` always returns a populated + # ``CollectionRoutingMap`` on success and raises otherwise -- + # No defensive None-check needed; one + # would only mask a future regression by silently leaving + # the cache empty instead of surfacing the failure. + self._collection_routing_map_by_item[collection_id] = new_routing_map return self._collection_routing_map_by_item.get(collection_id) @@ -326,7 +329,7 @@ async def _fetch_routing_map( previous_routing_map: Optional[CollectionRoutingMap], feed_options: Optional[Dict[str, Any]], **kwargs - ) -> Optional[CollectionRoutingMap]: + ) -> CollectionRoutingMap: """Fetches or updates the routing map using an incremental change feed. This method handles both the initial loading of a collection's routing @@ -336,15 +339,26 @@ async def _fetch_routing_map( of inconsistencies during an incremental update, it automatically falls back to a full refresh. + Always returns a populated :class:`CollectionRoutingMap` on success. + Failure modes raise an exception rather than returning ``None``: + ``CosmosHttpResponseError`` for the underlying network call (including + the transient HTTP 503 raised once the snapshot-inconsistency retry + budget is exhausted), or the internal ``_IncrementalMergeFailed`` + signal when the incremental-merge path cannot make progress and there + is no previous map to fall back on. + :param str collection_link: The link to the collection. :param str collection_id: The ID of the collection. :param previous_routing_map: The last known routing map for incremental updates. :type previous_routing_map: azure.cosmos.routing.collection_routing_map.CollectionRoutingMap or None :param feed_options: Options for the change feed request. :type feed_options: dict or None - :return: The updated or newly created CollectionRoutingMap, or None if the update fails. - :rtype: azure.cosmos.routing.collection_routing_map.CollectionRoutingMap or None - :raises CosmosHttpResponseError: If the underlying request to fetch ranges fails. + :return: The updated or newly created CollectionRoutingMap. + :rtype: azure.cosmos.routing.collection_routing_map.CollectionRoutingMap + :raises CosmosHttpResponseError: If the underlying ``/pkranges`` fetch + fails, or if every snapshot-inconsistency retry exhausts the + budget (surfaced as HTTP 503 so the upstream retry policy can + take over). """ current_previous_map = previous_routing_map incomplete_attempt_count = 0 diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/routing_map_provider.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/routing_map_provider.py index 3b78cad9e2d4..2772552ce8f5 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/routing_map_provider.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/routing_map_provider.py @@ -285,9 +285,12 @@ def get_routing_map( feed_options, **kwargs ) - - if new_routing_map: - self._collection_routing_map_by_item[collection_id] = new_routing_map + # ``_fetch_routing_map`` always returns a populated + # ``CollectionRoutingMap`` on success and raises otherwise -- + # No defensive None-check needed; one + # would only mask a future regression by silently leaving + # the cache empty instead of surfacing the failure. + self._collection_routing_map_by_item[collection_id] = new_routing_map return self._collection_routing_map_by_item.get(collection_id) @@ -300,7 +303,7 @@ def _fetch_routing_map( previous_routing_map: Optional[CollectionRoutingMap], feed_options: Optional[Dict[str, Any]], **kwargs - ) -> Optional[CollectionRoutingMap]: + ) -> CollectionRoutingMap: """Fetches or updates the routing map using an incremental change feed. @@ -311,14 +314,26 @@ def _fetch_routing_map( of inconsistencies during an incremental update, it automatically falls back to a full refresh. + Always returns a populated :class:`CollectionRoutingMap` on success. + Failure modes raise an exception rather than returning ``None``: + ``CosmosHttpResponseError`` for the underlying network call (including + the transient HTTP 503 raised once the snapshot-inconsistency retry + budget is exhausted), or the internal ``_IncrementalMergeFailed`` + signal when the incremental-merge path cannot make progress and there + is no previous map to fall back on. + :param str collection_link: The link to the collection. :param str collection_id: The unique identifier of the collection. :param previous_routing_map: The routing map to be updated. If None, a full load is performed. :type previous_routing_map: azure.cosmos.routing.collection_routing_map.CollectionRoutingMap :param feed_options: Options for the change feed request. :type feed_options: dict or None - :return: The new or updated CollectionRoutingMap, or None if retrieval fails. - :rtype: azure.cosmos.routing.collection_routing_map.CollectionRoutingMap or None + :return: The new or updated CollectionRoutingMap. + :rtype: azure.cosmos.routing.collection_routing_map.CollectionRoutingMap + :raises CosmosHttpResponseError: If the underlying ``/pkranges`` fetch + fails, or if every snapshot-inconsistency retry exhausts the + budget (surfaced as HTTP 503 so the upstream retry policy can + take over). """ current_previous_map = previous_routing_map incomplete_attempt_count = 0 diff --git a/sdk/cosmos/azure-cosmos/tests/routing/test_routing_map_provider.py b/sdk/cosmos/azure-cosmos/tests/routing/test_routing_map_provider.py index 56f6637ff454..153588350305 100644 --- a/sdk/cosmos/azure-cosmos/tests/routing/test_routing_map_provider.py +++ b/sdk/cosmos/azure-cosmos/tests/routing/test_routing_map_provider.py @@ -12,8 +12,9 @@ from azure.cosmos import http_constants from typing import Optional, Mapping, Any -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch import threading +from azure.cosmos.exceptions import CosmosHttpResponseError @pytest.mark.cosmosEmulator class TestRoutingMapProvider(unittest.TestCase): @@ -336,14 +337,18 @@ def test_is_cache_stale_etag_logic(self): mock_map2.change_feed_etag = cached_map.change_feed_etag self.assertFalse(provider._is_cache_stale(collection_id, mock_map2)) - def test_fetch_routing_map_full_load_with_incomplete_ranges_returns_none(self): - """When a full load (previous_routing_map=None) returns gapped ranges, returns None immediately.""" + def test_fetch_routing_map_full_load_with_incomplete_ranges_surfaces_503(self): + """When a full load (previous_routing_map=None) repeatedly returns + gapped ranges, the retry budget should be exhausted and the provider + should surface a retryable HTTP 503.""" incomplete_ranges = [ {'id': '0', 'minInclusive': '', 'maxExclusive': '80'} # Gap from 80 to FF ] + call_count = {'count': 0} class IncompleteClient: def _ReadPartitionKeyRanges(self, _collection_link, feed_options=None, **kwargs): + call_count['count'] += 1 TestRoutingMapProvider._capture_internal_headers(kwargs, '"incomplete-etag"') return incomplete_ranges @@ -352,13 +357,16 @@ def _ReadPartitionKeyRanges(self, _collection_link, feed_options=None, **kwargs) collection_link = "dbs/db/colls/container" collection_id = _base.GetResourceIdOrFullNameFromLink(collection_link) - result = provider._fetch_routing_map( - collection_link=collection_link, - collection_id=collection_id, - previous_routing_map=None, - feed_options={}, - ) - self.assertIsNone(result, "Should return None when full load produces incomplete ranges") + with patch('azure.cosmos._routing.routing_map_provider.time.sleep', return_value=None): + with self.assertRaises(CosmosHttpResponseError) as ctx: + provider._fetch_routing_map( + collection_link=collection_link, + collection_id=collection_id, + previous_routing_map=None, + feed_options={}, + ) + self.assertEqual(ctx.exception.status_code, http_constants.StatusCodes.SERVICE_UNAVAILABLE) + self.assertEqual(call_count['count'], 3) def test_fetch_routing_map_incremental_with_parents(self): """Incremental update correctly merges child ranges that reference a parent.""" diff --git a/sdk/cosmos/azure-cosmos/tests/routing/test_routing_map_provider_async.py b/sdk/cosmos/azure-cosmos/tests/routing/test_routing_map_provider_async.py index 5d7408bb6216..983aa12313e5 100644 --- a/sdk/cosmos/azure-cosmos/tests/routing/test_routing_map_provider_async.py +++ b/sdk/cosmos/azure-cosmos/tests/routing/test_routing_map_provider_async.py @@ -12,7 +12,8 @@ from azure.cosmos import http_constants from typing import Optional, Mapping, Any -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch +from azure.cosmos.exceptions import CosmosHttpResponseError @pytest.mark.cosmosEmulator @@ -316,14 +317,18 @@ async def test_is_cache_stale_etag_logic_async(self): mock_map2.change_feed_etag = cached_map.change_feed_etag self.assertFalse(provider._is_cache_stale(collection_id, mock_map2)) - async def test_fetch_routing_map_full_load_with_incomplete_ranges_returns_none_async(self): - """When a full load (previous_routing_map=None) returns gapped ranges, returns None immediately.""" + async def test_fetch_routing_map_full_load_with_incomplete_ranges_surfaces_503_async(self): + """When a full load (previous_routing_map=None) repeatedly returns + gapped ranges, the retry budget should be exhausted and the provider + should surface a retryable HTTP 503.""" incomplete_ranges = [ {'id': '0', 'minInclusive': '', 'maxExclusive': '80'} # Gap from 80 to FF ] + call_count = {'count': 0} class IncompleteClient: def _ReadPartitionKeyRanges(self, _collection_link, feed_options=None, **kwargs): + call_count['count'] += 1 TestRoutingMapProviderAsync._capture_internal_headers(kwargs, '"incomplete-etag"') async def _gen(): @@ -337,13 +342,19 @@ async def _gen(): collection_link = "dbs/db/colls/container" collection_id = _base.GetResourceIdOrFullNameFromLink(collection_link) - result = await provider._fetch_routing_map( - collection_link=collection_link, - collection_id=collection_id, - previous_routing_map=None, - feed_options={}, - ) - self.assertIsNone(result, "Should return None when full load produces incomplete ranges") + async def _no_sleep(_seconds): + return None + + with patch('azure.cosmos._routing.aio.routing_map_provider.asyncio.sleep', new=_no_sleep): + with self.assertRaises(CosmosHttpResponseError) as ctx: + await provider._fetch_routing_map( + collection_link=collection_link, + collection_id=collection_id, + previous_routing_map=None, + feed_options={}, + ) + self.assertEqual(ctx.exception.status_code, http_constants.StatusCodes.SERVICE_UNAVAILABLE) + self.assertEqual(call_count['count'], 3) async def test_fetch_routing_map_incremental_with_parents_async(self): """Incremental update correctly merges child ranges that reference a parent.""" diff --git a/sdk/cosmos/azure-cosmos/tests/test_container_rid_header_unit.py b/sdk/cosmos/azure-cosmos/tests/test_container_rid_header_unit.py index 8ea0bda6dbe3..04214d6c3f9a 100644 --- a/sdk/cosmos/azure-cosmos/tests/test_container_rid_header_unit.py +++ b/sdk/cosmos/azure-cosmos/tests/test_container_rid_header_unit.py @@ -10,6 +10,7 @@ import unittest from typing import Optional, Mapping, Any +from unittest.mock import patch from azure.cosmos._routing import routing_range from azure.cosmos._routing.routing_map_provider import ( PartitionKeyRangeCache, @@ -17,6 +18,7 @@ ) from azure.cosmos._routing.collection_routing_map import CollectionRoutingMap from azure.cosmos import _base, http_constants +from azure.cosmos.exceptions import CosmosHttpResponseError # ===================================================================== @@ -333,11 +335,10 @@ def _ReadPartitionKeyRanges(self, _collection_link, feed_options=None, **kwargs) "returning a delta instead of the complete set of ranges" ) - def test_full_load_with_incomplete_ranges_returns_none(self): - """When a full load (no previous routing map) returns ranges with gaps, - CompleteRoutingMap returns None. The method must return None immediately - without retrying — there is no incremental state to fall back from, and - repeating the identical request would produce the same result.""" + def test_full_load_with_incomplete_ranges_surfaces_503(self): + """When a full load (no previous routing map) repeatedly returns gapped + ranges, the retry budget should be exhausted and _fetch_routing_map + should surface a retryable HTTP 503.""" class IncompleteRangesClient: """Returns ranges with a gap — CompleteRoutingMap will return None.""" @@ -354,16 +355,15 @@ def _ReadPartitionKeyRanges(self, _collection_link, feed_options=None, **kwargs) client = IncompleteRangesClient() cache = PartitionKeyRangeCache(client) - result = cache._fetch_routing_map( - COLLECTION_LINK, - _base.GetResourceIdOrFullNameFromLink(COLLECTION_LINK), - None, # full load (no previous map) - {}, - ) - assert result is None, ( - "Full load with incomplete ranges must return None " - "instead of retrying infinitely" - ) + with patch('azure.cosmos._routing.routing_map_provider.time.sleep', return_value=None): + with self.assertRaises(CosmosHttpResponseError) as ctx: + cache._fetch_routing_map( + COLLECTION_LINK, + _base.GetResourceIdOrFullNameFromLink(COLLECTION_LINK), + None, # full load (no previous map) + {}, + ) + self.assertEqual(ctx.exception.status_code, http_constants.StatusCodes.SERVICE_UNAVAILABLE) def test_incremental_fallback_to_full_load_succeeds(self): """When an incremental (change-feed) update fails because a returned diff --git a/sdk/cosmos/azure-cosmos/tests/test_container_rid_header_unit_async.py b/sdk/cosmos/azure-cosmos/tests/test_container_rid_header_unit_async.py index 85b90785fbfc..853545d64a62 100644 --- a/sdk/cosmos/azure-cosmos/tests/test_container_rid_header_unit_async.py +++ b/sdk/cosmos/azure-cosmos/tests/test_container_rid_header_unit_async.py @@ -10,6 +10,7 @@ import unittest from typing import Optional, Mapping, Any, Dict +from unittest.mock import patch from azure.cosmos._routing import routing_range from azure.cosmos._routing.aio.routing_map_provider import ( PartitionKeyRangeCache, @@ -17,6 +18,7 @@ ) from azure.cosmos._routing.collection_routing_map import CollectionRoutingMap from azure.cosmos import _base, http_constants +from azure.cosmos.exceptions import CosmosHttpResponseError # ===================================================================== @@ -293,11 +295,10 @@ async def _ReadPartitionKeyRanges(self, collection_link, feed_options=None, **kw "returning a delta instead of the complete set of ranges" ) - async def test_full_load_with_incomplete_ranges_returns_none_async(self): - """When a full load (no previous routing map) returns ranges with gaps, - CompleteRoutingMap returns None. The method must return None immediately - without retrying — there is no incremental state to fall back from, and - repeating the identical request would produce the same result.""" + async def test_full_load_with_incomplete_ranges_surfaces_503_async(self): + """When a full load (no previous routing map) repeatedly returns gapped + ranges, the retry budget should be exhausted and _fetch_routing_map + should surface a retryable HTTP 503.""" class IncompleteRangesClient: async def _ReadPartitionKeyRanges(self, collection_link, feed_options=None, **kwargs): @@ -314,16 +315,18 @@ async def _ReadPartitionKeyRanges(self, collection_link, feed_options=None, **kw client = IncompleteRangesClient() cache = PartitionKeyRangeCache(client) - result = await cache._fetch_routing_map( - COLLECTION_LINK, - _base.GetResourceIdOrFullNameFromLink(COLLECTION_LINK), - None, # full load (no previous map) - {}, - ) - assert result is None, ( - "Full load with incomplete ranges must return None " - "instead of retrying infinitely" - ) + async def _no_sleep(_seconds): + return None + + with patch('azure.cosmos._routing.aio.routing_map_provider.asyncio.sleep', new=_no_sleep): + with self.assertRaises(CosmosHttpResponseError) as ctx: + await cache._fetch_routing_map( + COLLECTION_LINK, + _base.GetResourceIdOrFullNameFromLink(COLLECTION_LINK), + None, # full load (no previous map) + {}, + ) + self.assertEqual(ctx.exception.status_code, http_constants.StatusCodes.SERVICE_UNAVAILABLE) async def test_incremental_fallback_to_full_load_succeeds_async(self): """When an incremental (change-feed) update fails because a returned diff --git a/sdk/cosmos/azure-cosmos/tests/test_partition_split_query.py b/sdk/cosmos/azure-cosmos/tests/test_partition_split_query.py index 73e0a21085cb..8985a72825f2 100644 --- a/sdk/cosmos/azure-cosmos/tests/test_partition_split_query.py +++ b/sdk/cosmos/azure-cosmos/tests/test_partition_split_query.py @@ -608,9 +608,8 @@ def test_full_refresh_fallback_stops_infinite_recursion(self): the service returns an incomplete set of partition ranges. When a full load is performed (previous_routing_map=None) and the service - returns gapped ranges, _fetch_routing_map must return None immediately - - there is no incremental state to fall back from, and repeating the - identical request would produce the same result.""" + returns gapped ranges, _fetch_routing_map should surface a retryable + HTTP 503 after exhausting the bounded retry budget.""" container_id = 'test_fallback_guard_' + str(uuid.uuid4()) self.key_database.create_container( id=container_id, @@ -644,19 +643,17 @@ def mock_read_ranges(*args, **kwargs): '_ReadPartitionKeyRanges', side_effect=mock_read_ranges ): - # Full load with incomplete ranges should return None immediately - result = provider._fetch_routing_map( - collection_link=collection_link, - collection_id=collection_id, - previous_routing_map=None, - feed_options={}, - ) - - # Should return None instead of recursing infinitely - assert result is None, \ - "_fetch_routing_map should return None when full load produces incomplete ranges" - - print("Validated: full load with incomplete ranges returns None without recursion") + with patch('azure.cosmos._routing.routing_map_provider.time.sleep', return_value=None): + with self.assertRaises(CosmosHttpResponseError) as ctx: + provider._fetch_routing_map( + collection_link=collection_link, + collection_id=collection_id, + previous_routing_map=None, + feed_options={}, + ) + self.assertEqual(ctx.exception.status_code, http_constants.StatusCodes.SERVICE_UNAVAILABLE) + + print("Validated: full load with incomplete ranges surfaces retryable HTTP 503") finally: self.key_database.delete_container(container_id) diff --git a/sdk/cosmos/azure-cosmos/tests/test_partition_split_query_async.py b/sdk/cosmos/azure-cosmos/tests/test_partition_split_query_async.py index 990d57195de1..fb701f1c613a 100644 --- a/sdk/cosmos/azure-cosmos/tests/test_partition_split_query_async.py +++ b/sdk/cosmos/azure-cosmos/tests/test_partition_split_query_async.py @@ -14,6 +14,7 @@ from azure.cosmos import http_constants from azure.cosmos import _base from azure.cosmos.aio import CosmosClient, DatabaseProxy, ContainerProxy +from azure.cosmos.exceptions import CosmosHttpResponseError async def run_queries(container, iterations): ret_list = [] @@ -592,12 +593,13 @@ async def test_is_cache_stale_etag_comparison_async(self): finally: await self.key_database.delete_container(container_id) - async def test_full_load_with_incomplete_ranges_returns_none_async(self): + async def test_full_load_with_incomplete_ranges_surfaces_503_async(self): """ - Validates that a full load with incomplete ranges returns None immediately. + Validates that a full load with incomplete ranges surfaces a retryable + HTTP 503 after exhausting the bounded retry budget. When a full load is performed (previous_routing_map=None) and the service - returns gapped ranges, _fetch_routing_map should return None without retrying - - there is no incremental state to fall back from. + returns gapped ranges, _fetch_routing_map should not leak internal + map-construction failures to callers. """ container_id = 'test_fallback_guard_async_' + str(uuid.uuid4()) await self.key_database.create_container( @@ -632,19 +634,20 @@ async def mock_read_ranges(*args, **kwargs): '_ReadPartitionKeyRanges', side_effect=mock_read_ranges ): - # Full load with incomplete ranges should return None immediately - result = await provider._fetch_routing_map( - collection_link=collection_link, - collection_id=collection_id, - previous_routing_map=None, - feed_options={}, - ) - - # Should return None instead of recursing infinitely - assert result is None, \ - "_fetch_routing_map should return None when full load produces incomplete ranges" - - print("Validated: full load with incomplete ranges returns None without recursion") + async def _no_sleep(_seconds): + return None + + with patch('azure.cosmos._routing.aio.routing_map_provider.asyncio.sleep', new=_no_sleep): + with self.assertRaises(CosmosHttpResponseError) as ctx: + await provider._fetch_routing_map( + collection_link=collection_link, + collection_id=collection_id, + previous_routing_map=None, + feed_options={}, + ) + self.assertEqual(ctx.exception.status_code, http_constants.StatusCodes.SERVICE_UNAVAILABLE) + + print("Validated: full load with incomplete ranges surfaces retryable HTTP 503") finally: await self.key_database.delete_container(container_id) diff --git a/sdk/cosmos/azure-cosmos/tests/test_routing_map_provider_unit.py b/sdk/cosmos/azure-cosmos/tests/test_routing_map_provider_unit.py index cfcfb046cb0c..b95ffd99d296 100644 --- a/sdk/cosmos/azure-cosmos/tests/test_routing_map_provider_unit.py +++ b/sdk/cosmos/azure-cosmos/tests/test_routing_map_provider_unit.py @@ -632,22 +632,14 @@ def read_pk_ranges_cascading(collection_link, options, response_hook=None, **kwa self.assertEqual(ids, ['4', '5', '3', '1']) self.assertEqual(result.change_feed_etag, '"etag-old"') - # ========================================================================== - # End-to-end retry-loop tests for transient /pkranges snapshot inconsistency - # on the SYNC provider. Mirrors the async equivalents to guarantee both - # providers stay in lockstep on this contract: the cache-load pipeline - # never lets a bare ValueError("Ranges overlap") escape -- it either - # recovers on retry, or surfaces a typed CosmosHttpResponseError(503) the - # upstream retry policy already knows how to handle. - # ========================================================================== # ========================================================================== - # Unit tests for the overlap-retry policy helper. These pin the contract - # that the returned backoff is always within the deterministic upper bound - # (so the worst-case-wall-time guarantee in the public docs holds) and that - # jitter is actually applied (so concurrent retriers in different Cosmos - # clients -- e.g. several PySpark workers in the same process -- do not - # retry in lockstep on the same gateway node). + # Helper-level retry-policy unit tests. + # + # These target only the pure helper that computes retry backoff / 503 + # escalation (no cache object, no fetch loop). They pin the contract that + # backoff stays within the deterministic upper bound and that jitter is + # actually applied so concurrent retriers do not retry in lockstep. # ========================================================================== def test_overlap_retry_backoff_is_within_deterministic_upper_bound(self): @@ -733,7 +725,12 @@ def test_overlap_retry_raises_503_at_attempt_budget_exhaustion(self): ) # ========================================================================== - # End-to-end retry-loop tests below ↓ + # Provider retry-loop behavior tests (mocked integration path). + # + # These exercise the sync provider's full fetch/retry loop with mocked + # /pkranges payloads. They verify the full path contract: transient + # inconsistencies either recover on retry or surface typed HTTP 503, and + # never leak raw ``ValueError("Ranges overlap")`` to callers. # ========================================================================== def test_fetch_routing_map_recovers_after_transient_overlap(self): @@ -1025,9 +1022,9 @@ def fake_read_pk_ranges(collection_link, options, response_hook=None, **kwargs): def test_fetch_routing_map_preserves_existing_cache_entry_when_force_refresh_surfaces_503(self): """A 503 raised by ``_fetch_routing_map`` during a forced refresh must - NOT corrupt the existing cached routing map. Customers commonly chain - ``force_refresh=True`` after a 410-Gone retry: if the refresh itself - fails transiently we want subsequent reads to keep returning the + NOT corrupt the existing cached routing map. The SDK commonly issues + ``force_refresh=True`` during 410/Gone recovery paths; if that refresh + itself fails transiently we want subsequent reads to keep returning the previously-cached map (a slightly stale answer is far better than a cache wiped out by a transient gateway hiccup, which would make every future query pay the full reload cost).""" diff --git a/sdk/cosmos/azure-cosmos/tests/test_routing_map_provider_unit_async.py b/sdk/cosmos/azure-cosmos/tests/test_routing_map_provider_unit_async.py index 46c048baad2a..ff121d3da531 100644 --- a/sdk/cosmos/azure-cosmos/tests/test_routing_map_provider_unit_async.py +++ b/sdk/cosmos/azure-cosmos/tests/test_routing_map_provider_unit_async.py @@ -501,16 +501,12 @@ async def async_gen(): self.assertEqual(result.change_feed_etag, '"etag-old"') # ========================================================================== - # End-to-end retry-loop tests for transient /pkranges snapshot inconsistency. + # Provider retry-loop behavior tests (mocked integration path). # - # These cover the integration between the builder (which converts the - # underlying ValueError("Ranges overlap") into the _OverlapDetected - # signal exception) and _fetch_routing_map (which catches the signal, - # sleeps briefly, and re-fetches). The customer-observed failure scenario - # is that one paginated /pkranges response can be internally inconsistent - # (e.g. a stale parent appearing alongside its children with missing - # parent references); a small retry budget with backoff gives the gateway - # time to converge before the SDK surfaces a transient HTTP 503. + # These cover integration between builder signaling (overlap/gap) and the + # async provider fetch/retry loop. With mocked /pkranges payloads we verify + # the full path contract: transient inconsistencies either recover on retry + # or surface typed HTTP 503, and never leak raw ``ValueError`` failures. # ========================================================================== async def test_fetch_routing_map_recovers_after_transient_overlap_async(self): @@ -781,6 +777,164 @@ async def test_incremental_overlap_converts_to_incremental_merge_failed_async(se bad_delta, previous_map, 'coll1', 'dbs/db1/colls/coll1', '"etag-new"' ) + async def test_fetch_routing_map_mixed_overlap_and_gap_signals_share_retry_budget_async(self): + """Async mirror of + ``test_fetch_routing_map_mixed_overlap_and_gap_signals_share_retry_budget``. + + The transient-snapshot retry budget is a single counter shared by + BOTH ``_OverlapDetected`` and ``_GapDetected`` signals -- it is not + per-signal-type. If the gateway alternates between overlap snapshots + and gap snapshots across attempts, the SDK must still surface a 503 + after the same ``_TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS`` budget. + + Async coverage is independent of the sync test because the async + ``_fetch_routing_map`` has its own ``except (_OverlapDetected, + _GapDetected)`` block and increments its own + ``inconsistency_attempt_count`` local under ``async with`` lock + scoping -- a future refactor that, for example, reset the counter + on signal-type change, or that lost the counter across an + ``await`` boundary, would only be caught by exercising the async + codepath end-to-end.""" + # Overlap payload: stale parent '10' coexists with its children that + # lack a ``parents`` reference. Triggers ``_OverlapDetected``. + overlap_payload = [ + {'id': 'L', 'minInclusive': '', 'maxExclusive': '80'}, + {'id': '10', 'minInclusive': '80', 'maxExclusive': 'A0'}, + {'id': '10/0', 'minInclusive': '80', 'maxExclusive': '90'}, + {'id': '10/1', 'minInclusive': '90', 'maxExclusive': 'A0'}, + {'id': 'R', 'minInclusive': 'A0', 'maxExclusive': 'FF'}, + ] + # Gap payload: ['80', 'A0') is missing entirely. Triggers ``_GapDetected``. + gap_payload = [ + {'id': 'L', 'minInclusive': '', 'maxExclusive': '80'}, + {'id': 'R', 'minInclusive': 'A0', 'maxExclusive': 'FF'}, + ] + + responses = [overlap_payload, gap_payload, overlap_payload] + call_count = {'n': 0} + + client = MagicMock() + + def fake_read_pk_ranges(collection_link, options, response_hook=None, **kwargs): + payload = responses[call_count['n']] if call_count['n'] < len(responses) else overlap_payload + call_count['n'] += 1 + headers = {http_constants.HttpHeaders.ETag: '"etag-mixed-{}"'.format(call_count['n'])} + if response_hook: + response_hook(headers, None) + capture_headers = kwargs.get('_internal_response_headers_capture') + if capture_headers is not None: + capture_headers.update(headers) + + async def async_gen(): + for r in payload: + yield r + + return async_gen() + + client._ReadPartitionKeyRanges = MagicMock(side_effect=fake_read_pk_ranges) + cache = PartitionKeyRangeCache(client) + + async def _no_sleep(_seconds): + return None + + with patch( + 'azure.cosmos._routing.aio.routing_map_provider.asyncio.sleep', + new=_no_sleep, + ): + with self.assertRaises(CosmosHttpResponseError) as ctx: + await cache.get_routing_map("dbs/db1/colls/coll1", feed_options={}) + + self.assertEqual( + ctx.exception.status_code, http_constants.StatusCodes.SERVICE_UNAVAILABLE, + "Alternating overlap/gap signals must still surface as HTTP 503 once " + "the shared budget is exhausted." + ) + self.assertEqual( + call_count['n'], _TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS, + "Overlap and gap signals must share one retry budget; alternating " + "between them must NOT extend the total number of attempts." + ) + + async def test_fetch_routing_map_preserves_existing_cache_entry_when_force_refresh_surfaces_503_async(self): + """Async mirror of + ``test_fetch_routing_map_preserves_existing_cache_entry_when_force_refresh_surfaces_503``. + + A 503 raised by ``_fetch_routing_map`` during a forced refresh must + NOT corrupt the existing cached routing map. The SDK commonly issues + ``force_refresh=True`` during 410/Gone recovery paths; if that refresh + itself fails transiently we want subsequent reads to keep returning + the previously-cached map (a slightly stale answer is far better than + a cache wiped out by a transient gateway hiccup). + + Async coverage is independent of the sync test because the async + ``get_routing_map`` writes to the cache from inside an ``async with`` + block. If a future refactor moved the write before the inner + ``try``/``except``, or if exception unwinding through the async + context manager somehow cleared the entry, only the async-level + end-to-end test would catch it.""" + # Pre-populate the shared cache with a known-good routing map. + cached_map = _make_complete_routing_map("dbs/db1/colls/coll1", '"etag-cached"') + cache = PartitionKeyRangeCache(MagicMock()) + cache._collection_routing_map_by_item["dbs/db1/colls/coll1"] = cached_map + + # Wire the client to return an inconsistent (overlap) snapshot every + # time -- forces the retry loop to exhaust its budget and raise 503. + bad_payload = [ + {'id': 'L', 'minInclusive': '', 'maxExclusive': '80'}, + {'id': '10', 'minInclusive': '80', 'maxExclusive': 'A0'}, + {'id': '10/0', 'minInclusive': '80', 'maxExclusive': '90'}, + {'id': '10/1', 'minInclusive': '90', 'maxExclusive': 'A0'}, + {'id': 'R', 'minInclusive': 'A0', 'maxExclusive': 'FF'}, + ] + + def fake_read_pk_ranges(collection_link, options, response_hook=None, **kwargs): + headers = {http_constants.HttpHeaders.ETag: '"etag-bad"'} + if response_hook: + response_hook(headers, None) + capture_headers = kwargs.get('_internal_response_headers_capture') + if capture_headers is not None: + capture_headers.update(headers) + + async def async_gen(): + for r in bad_payload: + yield r + + return async_gen() + + cache._document_client._ReadPartitionKeyRanges = MagicMock(side_effect=fake_read_pk_ranges) + + async def _no_sleep(_seconds): + return None + + with patch( + 'azure.cosmos._routing.aio.routing_map_provider.asyncio.sleep', + new=_no_sleep, + ): + with self.assertRaises(CosmosHttpResponseError) as ctx: + await cache.get_routing_map( + "dbs/db1/colls/coll1", + feed_options={}, + force_refresh=True, + previous_routing_map=cached_map, + ) + + self.assertEqual(ctx.exception.status_code, http_constants.StatusCodes.SERVICE_UNAVAILABLE) + + # Critical invariant: the previously-cached map must still be reachable + # via the same key. A 503 from a forced refresh must never evict good + # cache state -- otherwise every transient gateway blip would force the + # next reader to pay a cold-start cost. + self.assertIs( + cache._collection_routing_map_by_item.get("dbs/db1/colls/coll1"), cached_map, + "Cached routing map must be preserved after a 503 from forced refresh -- " + "transient inconsistencies must not evict good cache state." + ) + self.assertEqual( + cache._collection_routing_map_by_item["dbs/db1/colls/coll1"].change_feed_etag, + '"etag-cached"', + "Cached ETag must remain the pre-503 value (no partial overwrite)." + ) + if __name__ == "__main__": unittest.main()