PYTHON-5676 Consolidate command execution logic#2852
Conversation
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
|
Failures related to #2853 |
There was a problem hiding this comment.
Pull request overview
This PR consolidates MongoDB command execution (logging + command monitoring/APM publishing + send/receive + response processing/checking + clusterTime gossip + optional decryption) into a single shared implementation (run_command) and updates the major call sites (standard command path, cursor operations, bulk writes) to route through it. This reduces duplicated “command execution skeleton” logic while keeping behavioral compatibility.
Changes:
- Introduces
pymongo.{a,}synchronous.command_runner.run_command()as the central command execution path. - Refactors
Server.run_operation()(cursor operations) and bulk/client-bulk write paths to userun_command()with hooks for legacy reply shaping and ordering constraints. - Renames/repurposes the former
networkcommand module intocommand_encoderto reflect its role as an encoder + pre-flight layer (with the actual I/O handled byrun_command/network_layer).
Reviewed changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| pymongo/asynchronous/command_runner.py | Adds the async shared run_command() implementation (logging/APM + transport + response handling). |
| pymongo/synchronous/command_runner.py | Adds the sync shared run_command() implementation (mirror of async via synchro). |
| pymongo/asynchronous/command_encoder.py | Routes standard async command execution through run_command(); updates module docstring. |
| pymongo/synchronous/command_encoder.py | Routes standard sync command execution through run_command(); updates module docstring. |
| pymongo/asynchronous/server.py | Refactors async cursor operations to use run_command() with reply shaping for monitoring. |
| pymongo/synchronous/server.py | Refactors sync cursor operations to use run_command() with reply shaping for monitoring. |
| pymongo/asynchronous/bulk.py | Refactors async bulk write command/unack paths to use run_command() and preserve ordering. |
| pymongo/synchronous/bulk.py | Refactors sync bulk write command/unack paths to use run_command() and preserve ordering. |
| pymongo/asynchronous/client_bulk.py | Refactors async client-bulk write command/unack paths to use run_command() while preserving top-level error embedding behavior. |
| pymongo/synchronous/client_bulk.py | Refactors sync client-bulk write command/unack paths to use run_command() while preserving top-level error embedding behavior. |
| pymongo/asynchronous/pool.py | Updates import to use command_encoder.command after module rename. |
| pymongo/synchronous/pool.py | Updates import to use command_encoder.command after module rename. |
Comments suppressed due to low confidence (1)
pymongo/synchronous/command_encoder.py:21
- Docstring references
pymongo.command_runner.run_command, but there is no top-levelpymongo.command_runnermodule in this codebase. Since this file importsrun_commandfrompymongo.synchronous.command_runner, the Sphinx reference should match to avoid broken cross-references / confusion.
| if bwc.publish: | ||
| bwc._start(cmd, request_id, op_docs, ns_docs) | ||
| try: | ||
| reply = bwc.conn.write_command(request_id, msg, bwc.codec) # type: ignore[misc, arg-type] |
There was a problem hiding this comment.
AsyncConnection.write_command doesn't appear to have any remaining callers, so it (and its sync counterpart) are dead code that should be removed.
| result = await bwc.conn.unack_write(msg, max_doc_size) # type: ignore[func-returns-value, misc, override] | ||
| duration = datetime.datetime.now() - bwc.start_time | ||
| if result is not None: | ||
| reply = _convert_write_result(bwc.name, cmd, result) # type: ignore[arg-type] |
There was a problem hiding this comment.
_convert_write_result is also dead code now.
| serviceId=bwc.conn.service_id, | ||
| ) | ||
| if bwc.publish: | ||
| bwc._start(cmd, request_id, docs) |
There was a problem hiding this comment.
_BulkWriteContext._start, _BulkWriteContext._succeed, _BulkWriteContext._fail, and _ClientBulkWriteContext._start are all dead code.
| _IS_SYNC = False | ||
|
|
||
|
|
||
| async def run_command( |
There was a problem hiding this comment.
This number of kwargs, several of which directly gate which network behavior to use, is extremely hard to parse and makes every call site much more complex than before. What if we split those different paths into three public methods that all wrap a private _run_command: one for acknowledged commands, one for unacknowledged commands, and one for cursor commands? That way we still have one unified command execution point, but multiple entry points that don't require a huge list of kwargs on every invocation.
| decrypt_reply=False, | ||
| ) | ||
| reply = result_docs[0] | ||
| # Process the response from the server. |
There was a problem hiding this comment.
We should pass process_response=True and remove this line.
| decrypt_reply=False, | ||
| ) | ||
| reply = result_docs[0] | ||
| # Process the response from the server. |
| published = dict(cmd) | ||
| published["ops"] = op_docs | ||
| published["nsInfo"] = ns_docs | ||
| reply: Mapping[str, Any] = {"ok": 1} |
There was a problem hiding this comment.
We can use the response from run_unacknowledged_command, right?
There was a problem hiding this comment.
I think this method (and _raise_if_not_writable) should be moved to the command runner logic.
| if bwc.publish: | ||
| bwc._fail(request_id, failure, duration) | ||
| # Process the response from the server. | ||
| if isinstance(exc, (NotPrimaryError, OperationFailure)): |
There was a problem hiding this comment.
Should this also be handled by run_acknowledged_command?
| # Top-level error will be embedded in ClientBulkWriteException. | ||
| reply = {"error": exc} | ||
| # Process the response from the server. | ||
| if isinstance(exc, OperationFailure): |
| reply = await conn.receive_message(None) | ||
| elif unacknowledged: | ||
| if use_conn_transport: | ||
| if not conn.is_writable: |
There was a problem hiding this comment.
Hmm, now this is defined twice. Maybe it should be a public method on conn.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 13 out of 13 changed files in this pull request and generated 2 comments.
Comments suppressed due to low confidence (1)
pymongo/synchronous/command_encoder.py:22
- The module reference in this docstring points to
pymongo.command_runner, but there is no top-levelpymongo/command_runner.pyin this codebase. This will produce incorrect Sphinx cross-references; it should point at the synchronous runner module.
| try: | ||
| reply = await bwc.conn.write_command(request_id, msg, bwc.codec) # type: ignore[misc, arg-type] | ||
| duration = datetime.datetime.now() - bwc.start_time | ||
| if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): | ||
| _debug_log( | ||
| _COMMAND_LOGGER, | ||
| message=_CommandStatusMessage.SUCCEEDED, | ||
| clientId=client._topology_settings._topology_id, | ||
| durationMS=duration, | ||
| reply=reply, | ||
| commandName=next(iter(cmd)), | ||
| databaseName=bwc.db_name, | ||
| requestId=request_id, | ||
| operationId=request_id, | ||
| driverConnectionId=bwc.conn.id, | ||
| serverConnectionId=bwc.conn.server_connection_id, | ||
| serverHost=bwc.conn.address[0], | ||
| serverPort=bwc.conn.address[1], | ||
| serviceId=bwc.conn.service_id, | ||
| ) | ||
| if bwc.publish: | ||
| bwc._succeed(request_id, reply, duration) # type: ignore[arg-type] | ||
| # Process the response from the server. | ||
| await self.client._process_response(reply, bwc.session) # type: ignore[arg-type] | ||
| result_docs, _, _ = await run_acknowledged_command( | ||
| bwc.conn, # type: ignore[arg-type] | ||
| cmd, | ||
| bwc.db_name, |
There was a problem hiding this comment.
These seem worth investigating. We may need to put _process_response back at the call site, but hopefully not.
There was a problem hiding this comment.
Is it a big deal if we need to but then the telemetry consolidation will resolve it?
There was a problem hiding this comment.
I think they're orthogonal concerns.
There was a problem hiding this comment.
The last commit only added a comment, I don't think it addressed this feedback
There was a problem hiding this comment.
IIUC, we either accept changing the ordering or we put _process_response back at the call site? Can we determine if changing the ordering matters? I would say let's go ahead, but admittedly I don't understand the full context.
There was a problem hiding this comment.
The ordering doesn't bother me as much as the other comment about including the special error gossip of {} vs exc.details when swallowing exceptions, given that the tests still pass.
There was a problem hiding this comment.
The ordering doesn't bother me as much as the other comment about
including the special error gossip of {} vs exc.details when swallowing exceptions, given that the tests still pass.
@NoahStapp suggested we check the spec and the appropriate spec appears to be the driver sessions spec, specifically:
Whenever a driver receives a cluster time from a server it MUST compare it to the current highest seen cluster time for
the deployment. If the new cluster time is higher than the highest seen cluster time it MUST become the new highest seen
cluster time. Two cluster times are compared using only the BsonTimestamp value of theclusterTimeembedded field (be
sure to include both the timestamp and the increment of the BsonTimestamp in the comparison). The signature field does
not participate in the comparison.
Since run_acknowledged_command calls _process_response(response_doc) before it calls _check_command_response, in an OperationFailure (a server error response that carries $clusterTime), gossip already happened inside run_acknowledged_command before the exception propagated to the except block. By the time client_bulk catches it, the spec requirement "MUST compare it to the current highest seen cluster time" is already satisfied. For non-server errors like network failures there's no response document at all, so there's no $clusterTime to gossip — the original _process_response({}) was also a no-op in that case. So I think it's OK to leave it as is.
There was a problem hiding this comment.
Okay sounds good, I say merge and iterate.
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
|
There are some merge conflicts |
rebased |
|
@blink1073 Thanks! Can you merge? I'm blocked because I committed last … or something. |
|
@NoahStapp has a requested change, are you good to merge? |
There was a problem hiding this comment.
Do we actually need this as a separate file from command_runner.py? It contains only one method, and that method just calls methods from command_runner.py after some processing.
There was a problem hiding this comment.
Possibly not … I combined them so we'll find out.
| return docs[0] | ||
| elif operation.name == "explain": | ||
| return docs[0] if docs else {} | ||
| res: dict[str, Any] = { | ||
| "cursor": {"id": reply.cursor_id, "ns": operation.namespace()}, # type: ignore[union-attr] | ||
| "ok": 1, | ||
| } | ||
| if operation.name == "find": | ||
| res["cursor"]["firstBatch"] = docs | ||
| else: | ||
| failure = _convert_exception(exc) | ||
| if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): | ||
| _debug_log( | ||
| _COMMAND_LOGGER, | ||
| message=_CommandStatusMessage.FAILED, | ||
| clientId=client._topology_settings._topology_id, | ||
| durationMS=duration, | ||
| failure=failure, | ||
| commandName=next(iter(cmd)), | ||
| databaseName=dbn, | ||
| requestId=request_id, | ||
| operationId=request_id, | ||
| driverConnectionId=conn.id, | ||
| serverConnectionId=conn.server_connection_id, | ||
| serverHost=conn.address[0], | ||
| serverPort=conn.address[1], | ||
| serviceId=conn.service_id, | ||
| isServerSideError=isinstance(exc, OperationFailure), | ||
| ) | ||
| if publish: | ||
| assert listeners is not None | ||
| listeners.publish_command_failure( | ||
| duration, | ||
| failure, | ||
| operation.name, | ||
| request_id, | ||
| conn.address, | ||
| conn.server_connection_id, | ||
| service_id=conn.service_id, | ||
| database_name=dbn, | ||
| ) | ||
| raise | ||
| duration = datetime.now() - start | ||
| # Must publish in find / getMore / explain command response | ||
| # format. | ||
| res = docs[0] | ||
| if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): | ||
| _debug_log( | ||
| _COMMAND_LOGGER, | ||
| message=_CommandStatusMessage.SUCCEEDED, | ||
| clientId=client._topology_settings._topology_id, | ||
| durationMS=duration, | ||
| reply=res, | ||
| commandName=next(iter(cmd)), | ||
| databaseName=dbn, | ||
| requestId=request_id, | ||
| operationId=request_id, | ||
| driverConnectionId=conn.id, | ||
| serverConnectionId=conn.server_connection_id, | ||
| serverHost=conn.address[0], | ||
| serverPort=conn.address[1], | ||
| serviceId=conn.service_id, | ||
| ) | ||
| if publish: | ||
| assert listeners is not None | ||
| listeners.publish_command_success( | ||
| duration, | ||
| res, | ||
| operation.name, | ||
| request_id, | ||
| conn.address, | ||
| conn.server_connection_id, | ||
| service_id=conn.service_id, | ||
| database_name=dbn, | ||
| ) | ||
|
|
||
| # Decrypt response. | ||
| client = operation.client # type: ignore[assignment] | ||
| if client and client._encrypter: | ||
| if use_cmd: | ||
| decrypted = await client._encrypter.decrypt(reply.raw_command_response()) | ||
| docs = _decode_all_selective(decrypted, operation.codec_options, user_fields) | ||
| res["cursor"]["nextBatch"] = docs |
There was a problem hiding this comment.
self.publish is dead code that we can remove.
There was a problem hiding this comment.
Dead code, can be removed.
| op_id=bwc.op_id, | ||
| command_name=bwc.name, | ||
| use_conn_transport=True, | ||
| decrypt_reply=False, |
There was a problem hiding this comment.
This should have set_conn_more_to_come=False for clarity. The default value for run_acknowledged_command is True.
| max_doc_size: int = 0, | ||
| more_to_come: bool = False, | ||
| set_conn_more_to_come: bool = True, | ||
| is_command_response: bool = True, |
| reply, | ||
| cursor_id, | ||
| codec_options, | ||
| legacy_response=not is_command_response, |
There was a problem hiding this comment.
PYTHON-5713 also makes legacy_response dead code.
After the consolidation, this module no longer does any networking -- the send/receive round trip moved into command_runner.run_command. It now only encodes a command and runs its pre-flight (read preference/concern, collation, $clusterTime, auto-encryption, CSOT, OP_MSG encoding), so 'network' was misleading and collided with the lower-level network_layer.py (raw sockets). Pure rename: git mv the async module (synchro regenerates the sync mirror) and update the two pool.py imports. No behavior change.
PYTHON-5713 removed _OpReply from pymongo/message.py since OP_MSG is now used exclusively. Update command_runner.py and server.py to drop the _OpReply import and simplify Union[_OpReply, _OpMsg] → _OpMsg in all type annotations. Regenerate sync mirrors via synchro.py.
…rocess_response calls
- Add _raise_if_not_writable() to AsyncConnection (and sync mirror);
use it in both pool.write() and command_runner._run_command() so the
NotPrimaryError logic lives in one place.
- bulk.run(): remove the try/except that manually called
_process_response on error — run_acknowledged_command already handles
it via process_response=True.
- client_bulk.run(): same — remove the _process_response calls from the
except block; the {"error": exc} wrapping for ClientBulkWriteException
is kept.
- Merge command_encoder.py into command_runner.py; update pool.py import - Fix _build_reply_doc regression in server.py: always return docs[0] - Remove dead code: is_command_response param from _run_command/run_cursor_command - Remove dead code: legacy_response passthrough in unpack_res call - Remove dead code: self.publish from _BulkWriteContextBase - Add explicit set_conn_more_to_come=False to bulk write_command call Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 13 out of 13 changed files in this pull request and generated 2 comments.
Comments suppressed due to low confidence (1)
pymongo/synchronous/command_encoder.py:22
- The module docstring references
pymongo.command_runner.run_acknowledged_command, but there is nopymongo.command_runnermodule in this codebase. This Sphinx cross-reference will be broken; it should point at the synchronous runner module instead.
|
@NoahStapp @blink1073 GTG here I think … |
There was a problem hiding this comment.
This file shouldn't exist since it's the synchronous version of the deleted pymongo/asynchronous/network.py.
There was a problem hiding this comment.
Did you mean to make this comment on command_encoder.py? I would expect network.py to be deleted in both, and command_runner.py added to both. If you agree, I'll also see about forcing git to recognize the link between the old and new files.
There was a problem hiding this comment.
No, we'd have to do a rebase-merge to preserve the link between the files.
| docs = _decode_all_selective(decrypted, operation.codec_options, user_fields) | ||
| user_fields = _CURSOR_DOC_FIELDS if use_cmd else None | ||
|
|
||
| def _build_reply_doc(docs: list[dict[str, Any]], reply: Optional[_OpMsg]) -> _DocumentOut: # noqa: ARG001 |
There was a problem hiding this comment.
This function can be removed entirely: I don't see another value passed to reply_doc_builder, can't we just inline the docs[0] itself within run_cursor_command?
| return docs, reply, duration | ||
|
|
||
|
|
||
| async def run_acknowledged_command( |
There was a problem hiding this comment.
This and run_unacknowledged_command are nearly identical besides a few kwargs and which fixed kwargs they pass through to _run_command. Will the logging/monitoring consolidation refactor allow us to either make these more distinct or reduce duplication? Or should we figure out a way to do that here in this PR?
There was a problem hiding this comment.
This and
run_unacknowledged_commandare nearly identical besides a few kwargs and which fixed kwargs they pass through to_run_command. Will the logging/monitoring consolidation refactor allow us to either make these more distinct or reduce duplication? Or should we figure out a way to do that here in this PR?
Those were created in response to this, so I'm fine with leaving them for now. Handing off to @blink1073 !
There was a problem hiding this comment.
The current code doesn't resolve my earlier comment that you linked: we still have a huge list of kwargs that's very difficult to parse and makes every call site much more complex than the pre-consolidation code.
There was a problem hiding this comment.
I pushed a refactor, but I still want to look over it in context it before re-requesting review.
There was a problem hiding this comment.
Okay, I'm happy with the current state with run_command, run_bulk_write_command and run_cursor_command. I've minimized the amount of kwargs needed as much as possible. run_command inherits the large number of kwargs in AsyncConnection.command.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 13 out of 13 changed files in this pull request and generated 4 comments.
Comments suppressed due to low confidence (1)
pymongo/synchronous/command_encoder.py:22
- The Sphinx cross-reference in this docstring points to
pymongo.command_runner, but there is no top-levelpymongo/command_runner.pyin this codebase. This will render as a broken reference; it should point at the synchronous runner module that actually providesrun_acknowledged_command.
Conflicts resolved: - bulk.py / client_bulk.py (async + sync): dropped datetime/logging imports removed by our refactor; kept Iterator/Mapping imports added upstream - message.py: kept our removal of dead _convert_write_result and publish slot; kept upstream's alphabetized __slots__ and new codec slot - network.py (modify/delete): kept our deletion (absorbed into command_runner.py) - command_encoder.py (sync rename/delete): kept our deletion - command_runner.py: applied upstream's collections.abc import modernization from network.py
Remove transport-selection kwargs (use_conn_transport, decrypt_reply, set_conn_more_to_come) from the bulk write call sites by introducing run_bulk_write_command, which hardcodes the correct values for the bulk write path.
…ed/unacknowledged_command The two wrapper functions were only ever called from command() in the same file. Collapse the if/else into a single run_command call using process_response=not unacknowledged and decrypt_reply=not unacknowledged.
…te_command Define a Protocol in command_runner.py capturing the fields that bulk write contexts supply (conn, db_name, session, listeners, start_time, codec, op_id, name). run_bulk_write_command now takes a bwc context object plus the four per-call arguments (cmd, request_id, msg, client), reducing call sites from ~13 arguments to ~5.
…lic wrappers _run_command holds all transport/response flags and is called only by the three public entry points. run_command exposes a clean interface for the standard network-transport path (deriving process_response and decrypt_reply from unacknowledged), run_bulk_write_command and run_cursor_command call _run_command directly with their own hardcoded flags.
Remove the thin run_command wrapper and rename command() to run_command() so there is a single public entry point for encoding and executing commands. pool.py updated to import and call run_command instead of command.
PYTHON-5676
Changes in this PR
Consolidates command execution into a single code path — the ticket's definition of done ("all database operations are done from a central location").
Previously the same execution skeleton (start clock → publish STARTED event → send → receive → process response → check for errors → publish SUCCEEDED/FAILED event) was duplicated across multiple call sites. This PR introduces
pymongo/asynchronous/command_runner.py(with a sync mirror auto-generated by synchro) whose private_run_command()owns that entire skeleton: APM event publishing, logging, the send/receive round trip,$clusterTimegossip, response processing, error checking, and auto-encryption decryption.The public API surface of the new module is:
run_command()— high-level entry point used bypool.py. Takes a rawspecdict, applies read concern, collation,$clusterTime, compression, auto-encryption, and CSOT, encodes it as OP_MSG, and delegates to_run_command.run_bulk_write_command()— entry point for collection and client bulk writes. Accepts a_BulkWriteContextProto(a Protocol capturing the fields needed from_BulkWriteContextBase) rather than individual connection/session/listener arguments, and passes bulk-specific flags to_run_command.run_cursor_command()— entry point for cursor-returning commands inserver.py. Passes cursor-specific flags (exhaustmore_to_come,unpack_res,cursor_id) to_run_command.All call sites now route through these functions. No public API changes.
Test Plan
No new tests: this is a behavior-preserving refactor. The existing APM/command-monitoring, command-logging, bulk write, and CRUD spec suites assert the exact event and log documents that any regression would change.
Checklist
Checklist for Author
Checklist for Reviewer