Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
25d163b
PYTHON-5676 Add command_runner.run_command; route network.command() t…
aclark4life Jun 4, 2026
7a08edf
PYTHON-5676 Route Server.run_operation() through run_command
aclark4life Jun 4, 2026
1bd671d
PYTHON-5676 Route collection bulk writes through run_command
aclark4life Jun 4, 2026
34c4929
PYTHON-5676 Route client-level bulk writes through run_command
aclark4life Jun 4, 2026
48b7fdc
PYTHON-5676 Rename network.py to command_encoder.py
aclark4life Jun 4, 2026
ed62887
Noah feedback
aclark4life Jun 5, 2026
ececf43
rename run_command → run_acknowledged_command
aclark4life Jun 5, 2026
c5dc812
Steve feedback
aclark4life Jun 8, 2026
a2fdbda
Fix ImportError: remove _OpReply references dropped by PYTHON-5713
aclark4life Jun 9, 2026
02e9dc3
Steve feedback: deduplicate _raise_if_not_writable; drop redundant _p…
aclark4life Jun 9, 2026
c56feb6
Copilot feedback
aclark4life Jun 10, 2026
1202a33
ruff format
aclark4life Jun 11, 2026
2ec97b7
Restore location of _raise_if_not_writable
aclark4life Jun 11, 2026
f98b77e
Restore location of _raise_if_not_writable
aclark4life Jun 11, 2026
a77683e
PYTHON-5676 Address Noah's review comments
aclark4life Jun 12, 2026
d57f51c
Copilot feedback
aclark4life Jun 15, 2026
ad39c86
PYTHON-5676 Pass set_conn_more_to_come=False in client bulk write com…
aclark4life Jun 16, 2026
569600c
Merge upstream/master into PYTHON-5676
blink1073 Jun 16, 2026
a4bf64f
PYTHON-5676 Add run_bulk_write_command to simplify bulk write API
blink1073 Jun 16, 2026
d82eac8
PYTHON-5676 Remove tests for _convert_write_result removed in this PR
blink1073 Jun 16, 2026
4c907eb
PYTHON-5676 Rename _run_command to run_command, remove run_acknowledg…
blink1073 Jun 16, 2026
a35d3d4
PYTHON-5676 Introduce _BulkWriteContextProto to simplify run_bulk_wri…
blink1073 Jun 16, 2026
210a67e
PYTHON-5676 Split run_command into private _run_command and clean pub…
blink1073 Jun 16, 2026
2576b7b
PYTHON-5676 Merge command() into run_command() in command_runner
blink1073 Jun 16, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 30 additions & 163 deletions pymongo/asynchronous/bulk.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
from __future__ import annotations

import copy
import datetime
import logging
from collections.abc import Iterator, Mapping, MutableMapping
from itertools import islice
from typing import (
Expand All @@ -34,9 +32,9 @@
from bson.objectid import ObjectId
from bson.raw_bson import RawBSONDocument
from pymongo import _csot, common
from pymongo.asynchronous.client_session import (
AsyncClientSession,
_validate_session_write_concern,
from pymongo.asynchronous.client_session import AsyncClientSession, _validate_session_write_concern
from pymongo.asynchronous.command_runner import (
run_bulk_write_command,
)
from pymongo.asynchronous.helpers import _handle_reauth
from pymongo.bulk_shared import (
Expand All @@ -54,18 +52,14 @@
from pymongo.errors import (
ConfigurationError,
InvalidOperation,
NotPrimaryError,
OperationFailure,
)
from pymongo.helpers_shared import _RETRYABLE_ERROR_CODES
from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log
from pymongo.message import (
_DELETE,
_INSERT,
_UPDATE,
_BulkWriteContext,
_convert_exception,
_convert_write_result,
_EncryptedBulkWriteContext,
_randint,
)
Expand Down Expand Up @@ -251,83 +245,16 @@ async def write_command(
docs: list[Mapping[str, Any]],
client: AsyncMongoClient[Any],
) -> dict[str, Any]:
"""A proxy for SocketInfo.write_command that handles event publishing."""
"""Run a batch write command, returning the response as a dict."""
cmd[bwc.field] = docs
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.STARTED,
clientId=client._topology_settings._topology_id,
command=cmd,
commandName=next(iter(cmd)),
databaseName=bwc.db_name,
requestId=request_id,
operationId=request_id,
driverConnectionId=bwc.conn.id,
serverConnectionId=bwc.conn.server_connection_id,
serverHost=bwc.conn.address[0],
serverPort=bwc.conn.address[1],
serviceId=bwc.conn.service_id,
)
if bwc.publish:
bwc._start(cmd, request_id, docs)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_BulkWriteContext._start, _BulkWriteContext._succeed, _BulkWriteContext._fail, and _ClientBulkWriteContext._start are all dead code.

try:
if bwc.session is not None and bwc.session._starting_transaction:
bwc.session._transaction.set_in_progress()
reply = await bwc.conn.write_command(request_id, msg, bwc.codec) # type: ignore[misc]
duration = datetime.datetime.now() - bwc.start_time
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.SUCCEEDED,
clientId=client._topology_settings._topology_id,
durationMS=duration,
reply=reply,
commandName=next(iter(cmd)),
databaseName=bwc.db_name,
requestId=request_id,
operationId=request_id,
driverConnectionId=bwc.conn.id,
serverConnectionId=bwc.conn.server_connection_id,
serverHost=bwc.conn.address[0],
serverPort=bwc.conn.address[1],
serviceId=bwc.conn.service_id,
)
if bwc.publish:
bwc._succeed(request_id, reply, duration) # type: ignore[arg-type]
await client._process_response(reply, bwc.session) # type: ignore[arg-type]
except Exception as exc:
duration = datetime.datetime.now() - bwc.start_time
if isinstance(exc, (NotPrimaryError, OperationFailure)):
failure: _DocumentOut = exc.details # type: ignore[assignment]
else:
failure = _convert_exception(exc)
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.FAILED,
clientId=client._topology_settings._topology_id,
durationMS=duration,
failure=failure,
commandName=next(iter(cmd)),
databaseName=bwc.db_name,
requestId=request_id,
operationId=request_id,
driverConnectionId=bwc.conn.id,
serverConnectionId=bwc.conn.server_connection_id,
serverHost=bwc.conn.address[0],
serverPort=bwc.conn.address[1],
serviceId=bwc.conn.service_id,
isServerSideError=isinstance(exc, OperationFailure),
)

if bwc.publish:
bwc._fail(request_id, failure, duration)
# Process the response from the server.
if isinstance(exc, (NotPrimaryError, OperationFailure)):
await client._process_response(exc.details, bwc.session) # type: ignore[arg-type]
raise
return reply # type: ignore[return-value]
result_docs, _, _ = await run_bulk_write_command(
bwc, # type: ignore[arg-type]
cmd,
request_id,
msg,
client=client,
)
return result_docs[0]

async def unack_write(
self,
Expand All @@ -339,83 +266,23 @@ async def unack_write(
docs: list[Mapping[str, Any]],
client: AsyncMongoClient[Any],
) -> Optional[Mapping[str, Any]]:
"""A proxy for AsyncConnection.unack_write that handles event publishing."""
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.STARTED,
clientId=client._topology_settings._topology_id,
command=cmd,
commandName=next(iter(cmd)),
databaseName=bwc.db_name,
requestId=request_id,
operationId=request_id,
driverConnectionId=bwc.conn.id,
serverConnectionId=bwc.conn.server_connection_id,
serverHost=bwc.conn.address[0],
serverPort=bwc.conn.address[1],
serviceId=bwc.conn.service_id,
)
if bwc.publish:
cmd = bwc._start(cmd, request_id, docs)
try:
result = await bwc.conn.unack_write(msg, max_doc_size) # type: ignore[func-returns-value, misc, override]
duration = datetime.datetime.now() - bwc.start_time
if result is not None:
reply = _convert_write_result(bwc.name, cmd, result) # type: ignore[arg-type]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_convert_write_result is also dead code now.

else:
# Comply with APM spec.
reply = {"ok": 1}
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.SUCCEEDED,
clientId=client._topology_settings._topology_id,
durationMS=duration,
reply=reply,
commandName=next(iter(cmd)),
databaseName=bwc.db_name,
requestId=request_id,
operationId=request_id,
driverConnectionId=bwc.conn.id,
serverConnectionId=bwc.conn.server_connection_id,
serverHost=bwc.conn.address[0],
serverPort=bwc.conn.address[1],
serviceId=bwc.conn.service_id,
)
if bwc.publish:
bwc._succeed(request_id, reply, duration)
except Exception as exc:
duration = datetime.datetime.now() - bwc.start_time
if isinstance(exc, OperationFailure):
failure: _DocumentOut = _convert_write_result(bwc.name, cmd, exc.details) # type: ignore[arg-type]
elif isinstance(exc, NotPrimaryError):
failure = exc.details # type: ignore[assignment]
else:
failure = _convert_exception(exc)
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.FAILED,
clientId=client._topology_settings._topology_id,
durationMS=duration,
failure=failure,
commandName=next(iter(cmd)),
databaseName=bwc.db_name,
requestId=request_id,
operationId=request_id,
driverConnectionId=bwc.conn.id,
serverConnectionId=bwc.conn.server_connection_id,
serverHost=bwc.conn.address[0],
serverPort=bwc.conn.address[1],
serviceId=bwc.conn.service_id,
isServerSideError=isinstance(exc, OperationFailure),
)
if bwc.publish:
assert bwc.start_time is not None
bwc._fail(request_id, failure, duration)
raise
return result # type: ignore[return-value]
"""Send an unacknowledged batch write command."""
# Historically the STARTED log omits the documents while the published
# CommandStartedEvent includes them, so log ``cmd`` but publish a copy
# carrying the ``docs`` field.
published = dict(cmd)
published[bwc.field] = docs
await run_bulk_write_command(
bwc, # type: ignore[arg-type]
cmd,
request_id,
msg,
client=client,
orig=published,
max_doc_size=max_doc_size,
unacknowledged=True,
)
return None

async def _execute_batch_unack(
self,
Expand Down Expand Up @@ -487,7 +354,7 @@ async def _execute_command(
run = self.current_run

# AsyncConnection.command validates the session, but we use
# AsyncConnection.write_command
# run_bulk_write_command.
conn.validate_session(client, session)
last_run = False

Expand Down
Loading
Loading