diff --git a/doc/changelog.rst b/doc/changelog.rst index ebbc72047b..3783e63005 100644 --- a/doc/changelog.rst +++ b/doc/changelog.rst @@ -1,6 +1,14 @@ Changelog ========= +Changes in Version 4.18.0 (2026/XX/XX) + +PyMongo 4.17 brings a number of changes including: + +- Improved performance for MongoDB's Intelligent Workload Management (IWM) by only retrying overload errors when doing so is expected to not worsen server conditions. + See the `IWM `_ or `Overload Errors `_ docs for more information. + + Changes in Version 4.17.0 (2026/04/20) -------------------------------------- diff --git a/pymongo/asynchronous/helpers.py b/pymongo/asynchronous/helpers.py index 0ee99a441a..144272dbd9 100644 --- a/pymongo/asynchronous/helpers.py +++ b/pymongo/asynchronous/helpers.py @@ -26,6 +26,7 @@ from typing import ( Any, Callable, + Optional, TypeVar, cast, ) @@ -82,10 +83,19 @@ async def inner(*args: Any, **kwargs: Any) -> Any: def _backoff( - attempt: int, initial_delay: float = _BACKOFF_INITIAL, max_delay: float = _BACKOFF_MAX + attempt: int, + initial_delay: float = _BACKOFF_INITIAL, + max_delay: float = _BACKOFF_MAX, + retry_after_backoff: Optional[float] = None, ) -> float: - jitter = random.random() # noqa: S311 - return jitter * min(initial_delay * (2**attempt), max_delay) + if retry_after_backoff: + # Jitter of up to +/- 50% of backoff + jitter = random.uniform(-1, 1) * 0.5 # noqa: S311 + + return (jitter * retry_after_backoff) + retry_after_backoff + else: + jitter = random.random() # noqa: S311 + return jitter * min(initial_delay * (2**attempt), max_delay) class _RetryPolicy: @@ -101,9 +111,11 @@ def __init__( self.backoff_initial = backoff_initial self.backoff_max = backoff_max - def backoff(self, attempt: int) -> float: - """Return the backoff duration for the given attempt.""" - return _backoff(max(0, attempt - 1), self.backoff_initial, self.backoff_max) + def backoff(self, attempt: int, retry_after_backoff: Optional[float] = None) -> float: + """Return the actual backoff duration for the given suggested backoff duration.""" + return _backoff( + max(0, attempt - 1), self.backoff_initial, self.backoff_max, retry_after_backoff + ) async def should_retry(self, attempt: int, delay: float) -> bool: """Return if we have retry attempts remaining and the next backoff would not exceed a timeout.""" diff --git a/pymongo/asynchronous/mongo_client.py b/pymongo/asynchronous/mongo_client.py index 6aeea53f4c..b1e479e152 100644 --- a/pymongo/asynchronous/mongo_client.py +++ b/pymongo/asynchronous/mongo_client.py @@ -2794,6 +2794,7 @@ def __init__( self._attempt_number = 0 self._is_run_command = is_run_command self._is_aggregate_write = is_aggregate_write + self._retry_after_backoff_ms = None async def run(self) -> T: """Runs the supplied func() and attempts a retry @@ -2849,6 +2850,9 @@ async def run(self) -> T: overloaded = exc.has_error_label("SystemOverloadedError") if overloaded: self._max_retries = self._client.options.max_adaptive_retries + self._retry_after_backoff_ms = ( + exc._retry_after_ms / 1000 if exc._retry_after_ms else None + ) always_retryable = exc.has_error_label("RetryableError") and overloaded if not self._client.options.retry_reads or ( not always_retryable @@ -2889,6 +2893,9 @@ async def run(self) -> T: overloaded = exc_to_check.has_error_label("SystemOverloadedError") if overloaded: self._max_retries = self._client.options.max_adaptive_retries + self._retry_after_backoff_ms = ( + exc._retry_after_ms / 1000 if exc._retry_after_ms else None + ) always_retryable = exc_to_check.has_error_label("RetryableError") and overloaded # Always retry abortTransaction and commitTransaction up to once @@ -2932,7 +2939,9 @@ async def run(self) -> T: self._always_retryable = always_retryable if overloaded: - delay = self._retry_policy.backoff(self._attempt_number) + delay = self._retry_policy.backoff( + self._attempt_number, self._retry_after_backoff_ms + ) if not await self._retry_policy.should_retry(self._attempt_number, delay): if exc_to_check.has_error_label("NoWritesPerformed") and self._last_error: raise self._last_error from exc diff --git a/pymongo/errors.py b/pymongo/errors.py index 59d9c203d9..f3fdafaf2f 100644 --- a/pymongo/errors.py +++ b/pymongo/errors.py @@ -33,10 +33,16 @@ class PyMongoError(Exception): """Base class for all PyMongo exceptions.""" - def __init__(self, message: str = "", error_labels: Optional[Iterable[str]] = None) -> None: + def __init__( + self, + message: str = "", + error_labels: Optional[Iterable[str]] = None, + retry_after_ms: Optional[str] = None, + ) -> None: super().__init__(message) self._message = message self._error_labels = set(error_labels or []) + self._retry_after_ms = int(retry_after_ms) if retry_after_ms else None def has_error_label(self, label: str) -> bool: """Return True if this error contains the given label. @@ -190,9 +196,15 @@ def __init__( max_wire_version: Optional[int] = None, ) -> None: error_labels = None + retry_after_ms = None if details is not None: error_labels = details.get("errorLabels") - super().__init__(_format_detailed_error(error, details), error_labels=error_labels) + retry_after_ms = details.get("retryAfterMS") + super().__init__( + _format_detailed_error(error, details), + error_labels=error_labels, + retry_after_ms=retry_after_ms, + ) self.__code = code self.__details = details self.__max_wire_version = max_wire_version diff --git a/pymongo/synchronous/helpers.py b/pymongo/synchronous/helpers.py index f24d72aea9..049e9f4410 100644 --- a/pymongo/synchronous/helpers.py +++ b/pymongo/synchronous/helpers.py @@ -26,6 +26,7 @@ from typing import ( Any, Callable, + Optional, TypeVar, cast, ) @@ -82,10 +83,19 @@ def inner(*args: Any, **kwargs: Any) -> Any: def _backoff( - attempt: int, initial_delay: float = _BACKOFF_INITIAL, max_delay: float = _BACKOFF_MAX + attempt: int, + initial_delay: float = _BACKOFF_INITIAL, + max_delay: float = _BACKOFF_MAX, + retry_after_backoff: Optional[float] = None, ) -> float: - jitter = random.random() # noqa: S311 - return jitter * min(initial_delay * (2**attempt), max_delay) + if retry_after_backoff: + # Jitter of up to +/- 50% of backoff + jitter = random.uniform(-1, 1) * 0.5 # noqa: S311 + + return (jitter * retry_after_backoff) + retry_after_backoff + else: + jitter = random.random() # noqa: S311 + return jitter * min(initial_delay * (2**attempt), max_delay) class _RetryPolicy: @@ -101,9 +111,11 @@ def __init__( self.backoff_initial = backoff_initial self.backoff_max = backoff_max - def backoff(self, attempt: int) -> float: - """Return the backoff duration for the given attempt.""" - return _backoff(max(0, attempt - 1), self.backoff_initial, self.backoff_max) + def backoff(self, attempt: int, retry_after_backoff: Optional[float] = None) -> float: + """Return the actual backoff duration for the given suggested backoff duration.""" + return _backoff( + max(0, attempt - 1), self.backoff_initial, self.backoff_max, retry_after_backoff + ) def should_retry(self, attempt: int, delay: float) -> bool: """Return if we have retry attempts remaining and the next backoff would not exceed a timeout.""" diff --git a/pymongo/synchronous/mongo_client.py b/pymongo/synchronous/mongo_client.py index 6b7c5d9c98..eff636dcac 100644 --- a/pymongo/synchronous/mongo_client.py +++ b/pymongo/synchronous/mongo_client.py @@ -2785,6 +2785,7 @@ def __init__( self._attempt_number = 0 self._is_run_command = is_run_command self._is_aggregate_write = is_aggregate_write + self._retry_after_backoff_ms = None def run(self) -> T: """Runs the supplied func() and attempts a retry @@ -2840,6 +2841,9 @@ def run(self) -> T: overloaded = exc.has_error_label("SystemOverloadedError") if overloaded: self._max_retries = self._client.options.max_adaptive_retries + self._retry_after_backoff_ms = ( + exc._retry_after_ms / 1000 if exc._retry_after_ms else None + ) always_retryable = exc.has_error_label("RetryableError") and overloaded if not self._client.options.retry_reads or ( not always_retryable @@ -2880,6 +2884,9 @@ def run(self) -> T: overloaded = exc_to_check.has_error_label("SystemOverloadedError") if overloaded: self._max_retries = self._client.options.max_adaptive_retries + self._retry_after_backoff_ms = ( + exc._retry_after_ms / 1000 if exc._retry_after_ms else None + ) always_retryable = exc_to_check.has_error_label("RetryableError") and overloaded # Always retry abortTransaction and commitTransaction up to once @@ -2923,7 +2930,9 @@ def run(self) -> T: self._always_retryable = always_retryable if overloaded: - delay = self._retry_policy.backoff(self._attempt_number) + delay = self._retry_policy.backoff( + self._attempt_number, self._retry_after_backoff_ms + ) if not self._retry_policy.should_retry(self._attempt_number, delay): if exc_to_check.has_error_label("NoWritesPerformed") and self._last_error: raise self._last_error from exc diff --git a/test/asynchronous/test_client_backpressure.py b/test/asynchronous/test_client_backpressure.py index fc3c9b85d8..3f863d8f57 100644 --- a/test/asynchronous/test_client_backpressure.py +++ b/test/asynchronous/test_client_backpressure.py @@ -294,6 +294,66 @@ async def test_04_overload_retries_limited_configured(self): # 6. Assert that the total number of started commands is max_retries + 1. self.assertEqual(len(self.listener.started_events), max_retries + 1) + @patch("random.uniform") + @patch("random.random") + @async_client_context.require_version_min(9, 0, 0, -1) + @async_client_context.require_failCommand_appName + async def test_05_overload_errors_with_retryafterms_override_backoff( + self, random_func, uniform_func + ): + # Drivers should test that overload errors with `retryAfterMS` override the default exponential backoff policy. + + # 1. Let `client` be a `MongoClient`. + client = self.client + + # 2. Let `coll` be a collection. + coll = client.test.test + + # 3. Configure the random number generator used for exponential backoff jitter to always return a number as + # close as possible to `1`. + random_func.return_value = 1 + + # 4. Configure the following failPoint: + fail_point = dict( + mode="alwaysOn", + data=dict( + failCommands=["insert"], + errorCode=462, + errorLabels=["SystemOverloadedError", "RetryableError"], + appName=self.app_name, + ), + ) + async with self.fail_point(fail_point): + # 5. Insert the document `{ a: 1 }`. Expect that the command errors. Measure the duration of the command + # execution. + start0 = perf_counter() + with self.assertRaises(OperationFailure): + await coll.insert_one({"a": 1}) + end0 = perf_counter() + exponential_backoff_time = end0 - start0 + + # 6. Configure the random number generator used for `retryAfterMS` jitter to always return `0`. + uniform_func.return_value = 0 + + # 7. Run the following command to set up `retryAfterMS` on overload errors. + try: + await client.admin.command("setParameter", 1, overloadRetryAfterMS=50) + + # 8. Execute step 5 again. + start1 = perf_counter() + with self.assertRaises(OperationFailure): + await coll.insert_one({"a": 1}) + end1 = perf_counter() + with_retry_after_ms_time = end1 - start1 + finally: + # 9. Run the following command to disable `retryAfterMS` on overload errors. + await client.admin.command("setParameter", 1, overloadRetryAfterMS=0) + + # 10. Compare the time between the two runs. + # The difference in the backoffs is 0.2 seconds. There is a 0.2-second window to account for potential variance + # between the two runs. + self.assertTrue(abs(exponential_backoff_time - (with_retry_after_ms_time + 0.2)) < 0.2) + # Location of JSON test specifications. if _IS_SYNC: diff --git a/test/test_client_backpressure.py b/test/test_client_backpressure.py index c50767db66..157c64b4d4 100644 --- a/test/test_client_backpressure.py +++ b/test/test_client_backpressure.py @@ -292,6 +292,63 @@ def test_04_overload_retries_limited_configured(self): # 6. Assert that the total number of started commands is max_retries + 1. self.assertEqual(len(self.listener.started_events), max_retries + 1) + @patch("random.uniform") + @patch("random.random") + @client_context.require_failCommand_appName + def test_05_overload_errors_with_retryafterms_override_backoff(self, random_func, uniform_func): + # Drivers should test that overload errors with `retryAfterMS` override the default exponential backoff policy. + + # 1. Let `client` be a `MongoClient`. + client = self.client + + # 2. Let `coll` be a collection. + coll = client.test.test + + # 3. Configure the random number generator used for exponential backoff jitter to always return a number as + # close as possible to `1`. + random_func.return_value = 1 + + # 4. Configure the following failPoint: + fail_point = dict( + mode="alwaysOn", + data=dict( + failCommands=["insert"], + errorCode=462, + errorLabels=["SystemOverloadedError", "RetryableError"], + appName=self.app_name, + ), + ) + with self.fail_point(fail_point): + # 5. Insert the document `{ a: 1 }`. Expect that the command errors. Measure the duration of the command + # execution. + start0 = perf_counter() + with self.assertRaises(OperationFailure): + coll.insert_one({"a": 1}) + end0 = perf_counter() + exponential_backoff_time = end0 - start0 + + # 6. Configure the random number generator used for `retryAfterMS` jitter to always return `0`. + uniform_func.return_value = 0 + + # 7. Run the following command to set up `retryAfterMS` on overload errors. + try: + client.admin.command("setParameter", 1, overloadRetryAfterMS=50) + + # 8. Execute step 5 again. + start1 = perf_counter() + with self.assertRaises(OperationFailure): + coll.insert_one({"a": 1}) + end1 = perf_counter() + with_retry_after_ms_time = end1 - start1 + finally: + # 9. Run the following command to disable `retryAfterMS` on overload errors. + client.admin.command("setParameter", 1, overloadRetryAfterMS=0) + + # 10. Compare the time between the two runs. + # The difference in the backoffs is 0.2 seconds. There is a 0.2-second window to account for potential variance + # between the two runs. + self.assertTrue(abs(exponential_backoff_time - (with_retry_after_ms_time + 0.2)) < 0.2) + # Location of JSON test specifications. if _IS_SYNC: