Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#### Bugs Fixed
* Fixed bug where `CosmosClient` construction with AAD credentials would crash at startup if the semantic reranking inference endpoint environment variable was not set, even when semantic reranking was not being used. The inference service is now lazily initialized on first use. See [PR 46243](https://github.com/Azure/azure-sdk-for-python/pull/46243)
* Fixed bug where region names in `preferred_locations` and `excluded_locations` (client-level and per-request) were not matched tolerantly for differences in case, whitespace, hyphens, and underscores. See [PR 46937](https://github.com/Azure/azure-sdk-for-python/pull/46937)
* Fixed bug where a `ValueError("Ranges overlap")` or an `AssertionError("code bug: returned overlapping ranges ... is empty")` from the partition key range cache could escape to the caller when the `/pkranges` response contained a transiently inconsistent snapshot (overlap or gap). See [PR 47091](https://github.com/Azure/azure-sdk-for-python/pull/47091)

#### Other Changes
* Reduced per-client memory overhead when partition-level circuit breaker (PPCB) is enabled by sharing the partition key range routing map cache across CosmosClient instances connected to the same endpoint, and stripping unused fields from cached partition key ranges using compact PKRange namedtuples. See [PR 46297](https://github.com/Azure/azure-sdk-for-python/pull/46297)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,22 @@
"""

import logging
import random
from typing import Any, Dict, List, Optional, Tuple

from .. import _base, http_constants
from .collection_routing_map import CollectionRoutingMap, _build_routing_map_from_ranges
from ..exceptions import CosmosHttpResponseError
# ``_OverlapDetected`` and ``_GapDetected`` are imported (but not referenced
# inside this module) so that provider modules and tests can import them from
# this single module instead of reaching into ``collection_routing_map``
# directly. Pylint reports ``unused-import`` on the ``from`` line as a whole
# (not on the individual names), so the disable must live on that line.
from .collection_routing_map import ( # pylint: disable=unused-import
CollectionRoutingMap,
_build_routing_map_from_ranges,
_OverlapDetected,
_GapDetected,
)
from . import routing_range
from .routing_range import (
PKRange,
Expand All @@ -44,6 +56,98 @@

PAGE_SIZE_CHANGE_FEED = "-1" # Return all available changes

# Maximum retry attempts for transient full-load /pkranges inconsistencies
# (overlap OR gap) before surfacing a transient HTTP 503. Centralised here so
# the sync and async providers share one source of truth.
_TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS = 3
# Initial backoff between inconsistency retries; doubles each attempt. With
# ``_TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS = 3``, attempt 3 raises 503 before
# sleeping, so only the post-attempt-1 and post-attempt-2 backoffs are slept --
# deterministic worst case is the sum of the first two schedule entries
# (``_TRANSIENT_SNAPSHOT_RETRY_INITIAL_BACKOFF_SECONDS`` and that value
# doubled); expected wall time is half of that under full jitter.
_TRANSIENT_SNAPSHOT_RETRY_INITIAL_BACKOFF_SECONDS = 0.5


def _jittered_backoff(backoff_seconds: float) -> float:
"""Return a uniformly-jittered backoff in the range ``[0, backoff_seconds]``.

Implements the "full jitter" strategy so concurrent retriers in different
processes do not retry in lockstep against the same gateway node.

:param float backoff_seconds: Deterministic upper bound for the backoff,
in seconds. Must be non-negative.
:return: A uniformly-distributed sleep value in ``[0, backoff_seconds]``.
:rtype: float
"""
return random.uniform(0, backoff_seconds)


def _handle_transient_snapshot_retry_decision(
*,
retry_attempt_count: int,
collection_link: str,
logger: logging.Logger, # pylint: disable=redefined-outer-name
) -> float:
"""Decide what to do after the full-load builder reported a transient
snapshot inconsistency (overlap or gap).

Returns a jittered backoff for the caller to sleep before the next
attempt; raises :class:`CosmosHttpResponseError` (HTTP 503) once
``_TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS`` is reached. Under the default
budget the final attempt raises 503 before sleeping, so only the first
``_TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS - 1`` attempts produce a sleep
(deterministic upper bounds doubling from
``_TRANSIENT_SNAPSHOT_RETRY_INITIAL_BACKOFF_SECONDS``). The caller
performs the actual sleep (``time.sleep`` vs ``await asyncio.sleep``),
which is the only line that differs between the sync and async
providers.

:keyword int retry_attempt_count: Attempts so far, including the one
that just failed. Pass ``1`` after the first failure.
:keyword str collection_link: Used in log messages and the 503 body.
:keyword logging.Logger logger: Caller's module-level logger.
:return: Jittered backoff seconds in ``[0, deterministic_upper_bound]``.
:rtype: float
:raises CosmosHttpResponseError: When the attempt budget is exhausted.
"""
if retry_attempt_count >= _TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS:
logger.error(
"Full-load routing-map fetch for collection '%s' detected a "
"transient snapshot inconsistency (overlap or gap) on every "
"one of %d attempt(s). Surfacing as transient HTTP 503 so the "
"caller's retry policy can take over.",
collection_link,
retry_attempt_count,
)
raise CosmosHttpResponseError(
status_code=http_constants.StatusCodes.SERVICE_UNAVAILABLE,
message=(
"Failed to build routing map for collection '{}': transient "
"snapshot inconsistency (overlap or gap) persisted across {} "
"full-load attempt(s). Surfaced as a retryable transient "
"error so the upstream retry policy can take over."
).format(collection_link, retry_attempt_count),
)

deterministic_backoff = (
_TRANSIENT_SNAPSHOT_RETRY_INITIAL_BACKOFF_SECONDS * (2 ** (retry_attempt_count - 1))
)
jittered_backoff = _jittered_backoff(deterministic_backoff)
logger.warning(
"Full-load routing-map fetch for collection '%s' detected a transient "
"snapshot inconsistency (overlap or gap) (attempt %d/%d). Sleeping "
"%.2fs (jittered from upper bound %.2fs) and retrying.",
collection_link,
retry_attempt_count,
_TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS,
jittered_backoff,
deterministic_backoff,
)
return jittered_backoff




def is_cache_unchanged_since_previous(
collection_routing_map_by_item: Dict[str, CollectionRoutingMap],
Expand Down Expand Up @@ -149,7 +253,7 @@ def _resolve_endpoint(client: Any) -> str:


class _IncrementalMergeFailed(Exception):
"""Sentinel raised by :func:`process_fetched_ranges` when the
"""Private exception type raised by :func:`process_fetched_ranges` when the
incremental update cannot resolve all partition key ranges.

The caller decides how to recover: retry the incremental fetch
Expand All @@ -162,7 +266,7 @@ def process_fetched_ranges(
collection_id: str,
collection_link: str,
new_etag: Optional[str],
) -> Optional[CollectionRoutingMap]:
) -> CollectionRoutingMap:
"""Turn raw PK-range results into a :class:`CollectionRoutingMap`.

Handles both initial-load (when *previous_routing_map* is ``None``)
Expand All @@ -177,10 +281,8 @@ def process_fetched_ranges(
:param str collection_id: The ID of the collection.
:param str collection_link: The link to the collection.
:param str new_etag: The ETag from the change feed response, or ``None``.
:return: The new/updated routing map, or ``None`` when an
initial load yields no ranges.
:return: The new/updated routing map.
:rtype: ~azure.cosmos._routing.collection_routing_map.CollectionRoutingMap
or None
:raises _IncrementalMergeFailed: When the incremental path cannot
resolve all ranges. The caller catches this and either retries
the incremental fetch or falls back to a full refresh.
Expand Down Expand Up @@ -257,7 +359,25 @@ def process_fetched_ranges(

unresolved = next_unresolved

result = previous_routing_map.try_combine(range_tuples, effective_etag)
try:
result = previous_routing_map.try_combine(range_tuples, effective_etag)
except ValueError as overlap_error:
Comment thread
dibahlfi marked this conversation as resolved.
# Convert the overlap ``ValueError`` from ``try_combine`` into
# ``_IncrementalMergeFailed`` so the caller retries the incremental
# fetch and ultimately falls back to the full-load path (which has
# its own ``_OverlapDetected`` retry + 503 safety net). Narrow to
# the literal ``"Ranges overlap"`` prefix (kept stable in
# ``is_complete_set_of_range`` and pinned by the regression tests)
# so any future unrelated ``ValueError`` surfaces as a real bug.
if not str(overlap_error).startswith("Ranges overlap"):
raise
logger.warning(
"Incremental merge for collection '%s' produced overlapping ranges: %s. "
"Converting to _IncrementalMergeFailed so the caller retries / "
"falls back to a full refresh.",
collection_link, str(overlap_error),
)
raise _IncrementalMergeFailed() from overlap_error
if not result:
logger.warning(
"Incremental merge resulted in incomplete routing map for "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
determine_refresh_action,
get_smart_overlapping_ranges,
_IncrementalMergeFailed,
_OverlapDetected,
_GapDetected,
_handle_transient_snapshot_retry_decision,
)


Expand Down Expand Up @@ -103,6 +106,8 @@
# Number of extra incremental attempts after an incomplete incremental merge
# before falling back to a full routing-map refresh.
_INCOMPLETE_ROUTING_MAP_MAX_RETRIES = 1


class PartitionKeyRangeCache(object):
"""
PartitionKeyRangeCache provides list of effective partition key ranges for a
Expand Down Expand Up @@ -307,9 +312,12 @@ async def get_routing_map(
**kwargs
)

# Update the cache.
if new_routing_map:
self._collection_routing_map_by_item[collection_id] = new_routing_map
# ``_fetch_routing_map`` always returns a populated
# ``CollectionRoutingMap`` on success and raises otherwise --
# No defensive None-check needed; one
# would only mask a future regression by silently leaving
# the cache empty instead of surfacing the failure.
self._collection_routing_map_by_item[collection_id] = new_routing_map

return self._collection_routing_map_by_item.get(collection_id)

Expand All @@ -321,7 +329,7 @@ async def _fetch_routing_map(
previous_routing_map: Optional[CollectionRoutingMap],
feed_options: Optional[Dict[str, Any]],
**kwargs
) -> Optional[CollectionRoutingMap]:
) -> CollectionRoutingMap:
"""Fetches or updates the routing map using an incremental change feed.

This method handles both the initial loading of a collection's routing
Expand All @@ -331,18 +339,30 @@ async def _fetch_routing_map(
of inconsistencies during an incremental update, it automatically falls
back to a full refresh.

Always returns a populated :class:`CollectionRoutingMap` on success.
Failure modes raise an exception rather than returning ``None``:
``CosmosHttpResponseError`` for the underlying network call (including
the transient HTTP 503 raised once the snapshot-inconsistency retry
budget is exhausted), or the internal ``_IncrementalMergeFailed``
signal when the incremental-merge path cannot make progress and there
is no previous map to fall back on.

:param str collection_link: The link to the collection.
:param str collection_id: The ID of the collection.
:param previous_routing_map: The last known routing map for incremental updates.
:type previous_routing_map: azure.cosmos.routing.collection_routing_map.CollectionRoutingMap or None
:param feed_options: Options for the change feed request.
:type feed_options: dict or None
:return: The updated or newly created CollectionRoutingMap, or None if the update fails.
:rtype: azure.cosmos.routing.collection_routing_map.CollectionRoutingMap or None
:raises CosmosHttpResponseError: If the underlying request to fetch ranges fails.
:return: The updated or newly created CollectionRoutingMap.
:rtype: azure.cosmos.routing.collection_routing_map.CollectionRoutingMap
:raises CosmosHttpResponseError: If the underlying ``/pkranges`` fetch
fails, or if every snapshot-inconsistency retry exhausts the
budget (surfaced as HTTP 503 so the upstream retry policy can
take over).
"""
current_previous_map = previous_routing_map
incomplete_attempt_count = 0
inconsistency_attempt_count = 0
Comment thread
dibahlfi marked this conversation as resolved.

while True:
request_kwargs = dict(kwargs)
Expand Down Expand Up @@ -398,6 +418,22 @@ async def _fetch_routing_map(
continue

raise
except (_OverlapDetected, _GapDetected):
# Reset ``current_previous_map`` to ``None`` so the next
# iteration runs the full-load path: we do not want to keep
# retrying an incremental fetch against the same inconsistent
# base. ``_handle_transient_snapshot_retry_decision`` returns
# the backoff or raises a 503 once the attempt budget is
# exhausted.
inconsistency_attempt_count += 1
backoff = _handle_transient_snapshot_retry_decision(
retry_attempt_count=inconsistency_attempt_count,
collection_link=collection_link,
logger=logger,
)
await asyncio.sleep(backoff)
current_previous_map = None
continue

async def get_range_by_partition_key_range_id(
self,
Expand Down
Loading
Loading