Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions doc/changelog.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
Changelog
=========

Changes in Version 4.18.0 (2026/XX/XX)

PyMongo 4.17 brings a number of changes including:

- Improved performance for MongoDB's Intelligent Workload Management (IWM) by only retrying overload errors when doing so is expected to not worsen server conditions.
See the `IWM <https://www.mongodb.com/docs/atlas/production-notes>`_ or `Overload Errors <https://www.mongodb.com/docs/atlas/overload-errors/?interface=driver&language=python>`_ docs for more information.


Changes in Version 4.17.0 (2026/04/20)
--------------------------------------

Expand Down
24 changes: 18 additions & 6 deletions pymongo/asynchronous/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from typing import (
Any,
Callable,
Optional,
TypeVar,
cast,
)
Expand Down Expand Up @@ -82,10 +83,19 @@ async def inner(*args: Any, **kwargs: Any) -> Any:


def _backoff(
attempt: int, initial_delay: float = _BACKOFF_INITIAL, max_delay: float = _BACKOFF_MAX
attempt: int,
initial_delay: float = _BACKOFF_INITIAL,
max_delay: float = _BACKOFF_MAX,
retry_after_backoff: Optional[float] = None,
) -> float:
jitter = random.random() # noqa: S311
return jitter * min(initial_delay * (2**attempt), max_delay)
if retry_after_backoff:
# Jitter of up to +/- 50% of backoff
jitter = random.uniform(-1, 1) * 0.5 # noqa: S311

return (jitter * retry_after_backoff) + retry_after_backoff
else:
jitter = random.random() # noqa: S311
return jitter * min(initial_delay * (2**attempt), max_delay)


class _RetryPolicy:
Expand All @@ -101,9 +111,11 @@ def __init__(
self.backoff_initial = backoff_initial
self.backoff_max = backoff_max

def backoff(self, attempt: int) -> float:
"""Return the backoff duration for the given attempt."""
return _backoff(max(0, attempt - 1), self.backoff_initial, self.backoff_max)
def backoff(self, attempt: int, retry_after_backoff: Optional[float] = None) -> float:
"""Return the actual backoff duration for the given suggested backoff duration."""
return _backoff(
max(0, attempt - 1), self.backoff_initial, self.backoff_max, retry_after_backoff
)

async def should_retry(self, attempt: int, delay: float) -> bool:
"""Return if we have retry attempts remaining and the next backoff would not exceed a timeout."""
Expand Down
11 changes: 10 additions & 1 deletion pymongo/asynchronous/mongo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2794,6 +2794,7 @@ def __init__(
self._attempt_number = 0
self._is_run_command = is_run_command
self._is_aggregate_write = is_aggregate_write
self._retry_after_backoff_ms = None

async def run(self) -> T:
"""Runs the supplied func() and attempts a retry
Expand Down Expand Up @@ -2849,6 +2850,9 @@ async def run(self) -> T:
overloaded = exc.has_error_label("SystemOverloadedError")
if overloaded:
self._max_retries = self._client.options.max_adaptive_retries
self._retry_after_backoff_ms = (
exc._retry_after_ms / 1000 if exc._retry_after_ms else None
)
always_retryable = exc.has_error_label("RetryableError") and overloaded
if not self._client.options.retry_reads or (
not always_retryable
Expand Down Expand Up @@ -2889,6 +2893,9 @@ async def run(self) -> T:
overloaded = exc_to_check.has_error_label("SystemOverloadedError")
if overloaded:
self._max_retries = self._client.options.max_adaptive_retries
self._retry_after_backoff_ms = (
exc._retry_after_ms / 1000 if exc._retry_after_ms else None
)
always_retryable = exc_to_check.has_error_label("RetryableError") and overloaded

# Always retry abortTransaction and commitTransaction up to once
Expand Down Expand Up @@ -2932,7 +2939,9 @@ async def run(self) -> T:

self._always_retryable = always_retryable
if overloaded:
delay = self._retry_policy.backoff(self._attempt_number)
delay = self._retry_policy.backoff(
self._attempt_number, self._retry_after_backoff_ms
)
if not await self._retry_policy.should_retry(self._attempt_number, delay):
if exc_to_check.has_error_label("NoWritesPerformed") and self._last_error:
raise self._last_error from exc
Expand Down
16 changes: 14 additions & 2 deletions pymongo/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,16 @@
class PyMongoError(Exception):
"""Base class for all PyMongo exceptions."""

def __init__(self, message: str = "", error_labels: Optional[Iterable[str]] = None) -> None:
def __init__(
self,
message: str = "",
error_labels: Optional[Iterable[str]] = None,
retry_after_ms: Optional[str] = None,
) -> None:
super().__init__(message)
self._message = message
self._error_labels = set(error_labels or [])
self._retry_after_ms = int(retry_after_ms) if retry_after_ms else None

def has_error_label(self, label: str) -> bool:
"""Return True if this error contains the given label.
Expand Down Expand Up @@ -190,9 +196,15 @@ def __init__(
max_wire_version: Optional[int] = None,
) -> None:
error_labels = None
retry_after_ms = None
if details is not None:
error_labels = details.get("errorLabels")
super().__init__(_format_detailed_error(error, details), error_labels=error_labels)
retry_after_ms = details.get("retryAfterMS")
super().__init__(
_format_detailed_error(error, details),
error_labels=error_labels,
retry_after_ms=retry_after_ms,
)
self.__code = code
self.__details = details
self.__max_wire_version = max_wire_version
Expand Down
24 changes: 18 additions & 6 deletions pymongo/synchronous/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from typing import (
Any,
Callable,
Optional,
TypeVar,
cast,
)
Expand Down Expand Up @@ -82,10 +83,19 @@ def inner(*args: Any, **kwargs: Any) -> Any:


def _backoff(
attempt: int, initial_delay: float = _BACKOFF_INITIAL, max_delay: float = _BACKOFF_MAX
attempt: int,
initial_delay: float = _BACKOFF_INITIAL,
max_delay: float = _BACKOFF_MAX,
retry_after_backoff: Optional[float] = None,
) -> float:
jitter = random.random() # noqa: S311
return jitter * min(initial_delay * (2**attempt), max_delay)
if retry_after_backoff:
# Jitter of up to +/- 50% of backoff
jitter = random.uniform(-1, 1) * 0.5 # noqa: S311

return (jitter * retry_after_backoff) + retry_after_backoff
else:
jitter = random.random() # noqa: S311
return jitter * min(initial_delay * (2**attempt), max_delay)


class _RetryPolicy:
Expand All @@ -101,9 +111,11 @@ def __init__(
self.backoff_initial = backoff_initial
self.backoff_max = backoff_max

def backoff(self, attempt: int) -> float:
"""Return the backoff duration for the given attempt."""
return _backoff(max(0, attempt - 1), self.backoff_initial, self.backoff_max)
def backoff(self, attempt: int, retry_after_backoff: Optional[float] = None) -> float:
"""Return the actual backoff duration for the given suggested backoff duration."""
return _backoff(
max(0, attempt - 1), self.backoff_initial, self.backoff_max, retry_after_backoff
)

def should_retry(self, attempt: int, delay: float) -> bool:
"""Return if we have retry attempts remaining and the next backoff would not exceed a timeout."""
Expand Down
11 changes: 10 additions & 1 deletion pymongo/synchronous/mongo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2785,6 +2785,7 @@ def __init__(
self._attempt_number = 0
self._is_run_command = is_run_command
self._is_aggregate_write = is_aggregate_write
self._retry_after_backoff_ms = None

def run(self) -> T:
"""Runs the supplied func() and attempts a retry
Expand Down Expand Up @@ -2840,6 +2841,9 @@ def run(self) -> T:
overloaded = exc.has_error_label("SystemOverloadedError")
if overloaded:
self._max_retries = self._client.options.max_adaptive_retries
self._retry_after_backoff_ms = (
exc._retry_after_ms / 1000 if exc._retry_after_ms else None
)
always_retryable = exc.has_error_label("RetryableError") and overloaded
if not self._client.options.retry_reads or (
not always_retryable
Expand Down Expand Up @@ -2880,6 +2884,9 @@ def run(self) -> T:
overloaded = exc_to_check.has_error_label("SystemOverloadedError")
if overloaded:
self._max_retries = self._client.options.max_adaptive_retries
self._retry_after_backoff_ms = (
exc._retry_after_ms / 1000 if exc._retry_after_ms else None
)
always_retryable = exc_to_check.has_error_label("RetryableError") and overloaded

# Always retry abortTransaction and commitTransaction up to once
Expand Down Expand Up @@ -2923,7 +2930,9 @@ def run(self) -> T:

self._always_retryable = always_retryable
if overloaded:
delay = self._retry_policy.backoff(self._attempt_number)
delay = self._retry_policy.backoff(
self._attempt_number, self._retry_after_backoff_ms
)
if not self._retry_policy.should_retry(self._attempt_number, delay):
if exc_to_check.has_error_label("NoWritesPerformed") and self._last_error:
raise self._last_error from exc
Expand Down
60 changes: 60 additions & 0 deletions test/asynchronous/test_client_backpressure.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,66 @@ async def test_04_overload_retries_limited_configured(self):
# 6. Assert that the total number of started commands is max_retries + 1.
self.assertEqual(len(self.listener.started_events), max_retries + 1)

@patch("random.uniform")
@patch("random.random")
@async_client_context.require_version_min(9, 0, 0, -1)
@async_client_context.require_failCommand_appName
async def test_05_overload_errors_with_retryafterms_override_backoff(
self, random_func, uniform_func
):
# Drivers should test that overload errors with `retryAfterMS` override the default exponential backoff policy.

# 1. Let `client` be a `MongoClient`.
client = self.client

# 2. Let `coll` be a collection.
coll = client.test.test

# 3. Configure the random number generator used for exponential backoff jitter to always return a number as
# close as possible to `1`.
random_func.return_value = 1

# 4. Configure the following failPoint:
fail_point = dict(
mode="alwaysOn",
data=dict(
failCommands=["insert"],
errorCode=462,
errorLabels=["SystemOverloadedError", "RetryableError"],
appName=self.app_name,
),
)
async with self.fail_point(fail_point):
# 5. Insert the document `{ a: 1 }`. Expect that the command errors. Measure the duration of the command
# execution.
start0 = perf_counter()
with self.assertRaises(OperationFailure):
await coll.insert_one({"a": 1})
end0 = perf_counter()
exponential_backoff_time = end0 - start0

# 6. Configure the random number generator used for `retryAfterMS` jitter to always return `0`.
uniform_func.return_value = 0

# 7. Run the following command to set up `retryAfterMS` on overload errors.
try:
await client.admin.command("setParameter", 1, overloadRetryAfterMS=50)

# 8. Execute step 5 again.
start1 = perf_counter()
with self.assertRaises(OperationFailure):
await coll.insert_one({"a": 1})
end1 = perf_counter()
with_retry_after_ms_time = end1 - start1
finally:
# 9. Run the following command to disable `retryAfterMS` on overload errors.
await client.admin.command("setParameter", 1, overloadRetryAfterMS=0)

# 10. Compare the time between the two runs.
# The difference in the backoffs is 0.2 seconds. There is a 0.2-second window to account for potential variance
# between the two runs.
self.assertTrue(abs(exponential_backoff_time - (with_retry_after_ms_time + 0.2)) < 0.2)


# Location of JSON test specifications.
if _IS_SYNC:
Expand Down
57 changes: 57 additions & 0 deletions test/test_client_backpressure.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,63 @@ def test_04_overload_retries_limited_configured(self):
# 6. Assert that the total number of started commands is max_retries + 1.
self.assertEqual(len(self.listener.started_events), max_retries + 1)

@patch("random.uniform")
@patch("random.random")
@client_context.require_failCommand_appName
def test_05_overload_errors_with_retryafterms_override_backoff(self, random_func, uniform_func):
# Drivers should test that overload errors with `retryAfterMS` override the default exponential backoff policy.

# 1. Let `client` be a `MongoClient`.
client = self.client

# 2. Let `coll` be a collection.
coll = client.test.test

# 3. Configure the random number generator used for exponential backoff jitter to always return a number as
# close as possible to `1`.
random_func.return_value = 1

# 4. Configure the following failPoint:
fail_point = dict(
mode="alwaysOn",
data=dict(
failCommands=["insert"],
errorCode=462,
errorLabels=["SystemOverloadedError", "RetryableError"],
appName=self.app_name,
),
)
with self.fail_point(fail_point):
# 5. Insert the document `{ a: 1 }`. Expect that the command errors. Measure the duration of the command
# execution.
start0 = perf_counter()
with self.assertRaises(OperationFailure):
coll.insert_one({"a": 1})
end0 = perf_counter()
exponential_backoff_time = end0 - start0

# 6. Configure the random number generator used for `retryAfterMS` jitter to always return `0`.
uniform_func.return_value = 0

# 7. Run the following command to set up `retryAfterMS` on overload errors.
try:
client.admin.command("setParameter", 1, overloadRetryAfterMS=50)

# 8. Execute step 5 again.
start1 = perf_counter()
with self.assertRaises(OperationFailure):
coll.insert_one({"a": 1})
end1 = perf_counter()
with_retry_after_ms_time = end1 - start1
finally:
# 9. Run the following command to disable `retryAfterMS` on overload errors.
client.admin.command("setParameter", 1, overloadRetryAfterMS=0)

# 10. Compare the time between the two runs.
# The difference in the backoffs is 0.2 seconds. There is a 0.2-second window to account for potential variance
# between the two runs.
self.assertTrue(abs(exponential_backoff_time - (with_retry_after_ms_time + 0.2)) < 0.2)


# Location of JSON test specifications.
if _IS_SYNC:
Expand Down
Loading