Skip to content

PYTHON-5676 Consolidate command execution logic#2852

Open
aclark4life wants to merge 24 commits into
mongodb:masterfrom
aclark4life:PYTHON-5676
Open

PYTHON-5676 Consolidate command execution logic#2852
aclark4life wants to merge 24 commits into
mongodb:masterfrom
aclark4life:PYTHON-5676

Conversation

@aclark4life

@aclark4life aclark4life commented Jun 4, 2026

Copy link
Copy Markdown
Contributor

PYTHON-5676

Changes in this PR

Consolidates command execution into a single code path — the ticket's definition of done ("all database operations are done from a central location").

Previously the same execution skeleton (start clock → publish STARTED event → send → receive → process response → check for errors → publish SUCCEEDED/FAILED event) was duplicated across multiple call sites. This PR introduces pymongo/asynchronous/command_runner.py (with a sync mirror auto-generated by synchro) whose private _run_command() owns that entire skeleton: APM event publishing, logging, the send/receive round trip, $clusterTime gossip, response processing, error checking, and auto-encryption decryption.

The public API surface of the new module is:

  • run_command() — high-level entry point used by pool.py. Takes a raw spec dict, applies read concern, collation, $clusterTime, compression, auto-encryption, and CSOT, encodes it as OP_MSG, and delegates to _run_command.
  • run_bulk_write_command() — entry point for collection and client bulk writes. Accepts a _BulkWriteContextProto (a Protocol capturing the fields needed from _BulkWriteContextBase) rather than individual connection/session/listener arguments, and passes bulk-specific flags to _run_command.
  • run_cursor_command() — entry point for cursor-returning commands in server.py. Passes cursor-specific flags (exhaust more_to_come, unpack_res, cursor_id) to _run_command.

All call sites now route through these functions. No public API changes.

Test Plan

No new tests: this is a behavior-preserving refactor. The existing APM/command-monitoring, command-logging, bulk write, and CRUD spec suites assert the exact event and log documents that any regression would change.

Checklist

Checklist for Author

  • Did you update the changelog (if necessary)? — Not necessary; internal refactor with no user-facing behavior change.
  • Is there test coverage? — Covered by existing spec suites (behavior preserved).
  • Is any followup work tracked in a JIRA ticket? If so, add link(s).

Checklist for Reviewer

  • Does the title of the PR reference a JIRA Ticket?
  • Do you fully understand the implementation? (Would you be comfortable explaining how this code works to someone else?)
  • Is all relevant documentation (README or docstring) updated?

@codecov-commenter

codecov-commenter commented Jun 4, 2026

Copy link
Copy Markdown

@aclark4life aclark4life marked this pull request as ready for review June 5, 2026 13:56
@aclark4life aclark4life requested a review from a team as a code owner June 5, 2026 13:56
@aclark4life aclark4life requested review from NoahStapp and Copilot June 5, 2026 13:56
@aclark4life

aclark4life commented Jun 5, 2026

Copy link
Copy Markdown
Contributor Author

Failures related to #2853

@aclark4life aclark4life requested a review from blink1073 June 5, 2026 14:01

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR consolidates MongoDB command execution (logging + command monitoring/APM publishing + send/receive + response processing/checking + clusterTime gossip + optional decryption) into a single shared implementation (run_command) and updates the major call sites (standard command path, cursor operations, bulk writes) to route through it. This reduces duplicated “command execution skeleton” logic while keeping behavioral compatibility.

Changes:

  • Introduces pymongo.{a,}synchronous.command_runner.run_command() as the central command execution path.
  • Refactors Server.run_operation() (cursor operations) and bulk/client-bulk write paths to use run_command() with hooks for legacy reply shaping and ordering constraints.
  • Renames/repurposes the former network command module into command_encoder to reflect its role as an encoder + pre-flight layer (with the actual I/O handled by run_command / network_layer).

Reviewed changes

Copilot reviewed 12 out of 12 changed files in this pull request and generated no comments.

Show a summary per file
File Description
pymongo/asynchronous/command_runner.py Adds the async shared run_command() implementation (logging/APM + transport + response handling).
pymongo/synchronous/command_runner.py Adds the sync shared run_command() implementation (mirror of async via synchro).
pymongo/asynchronous/command_encoder.py Routes standard async command execution through run_command(); updates module docstring.
pymongo/synchronous/command_encoder.py Routes standard sync command execution through run_command(); updates module docstring.
pymongo/asynchronous/server.py Refactors async cursor operations to use run_command() with reply shaping for monitoring.
pymongo/synchronous/server.py Refactors sync cursor operations to use run_command() with reply shaping for monitoring.
pymongo/asynchronous/bulk.py Refactors async bulk write command/unack paths to use run_command() and preserve ordering.
pymongo/synchronous/bulk.py Refactors sync bulk write command/unack paths to use run_command() and preserve ordering.
pymongo/asynchronous/client_bulk.py Refactors async client-bulk write command/unack paths to use run_command() while preserving top-level error embedding behavior.
pymongo/synchronous/client_bulk.py Refactors sync client-bulk write command/unack paths to use run_command() while preserving top-level error embedding behavior.
pymongo/asynchronous/pool.py Updates import to use command_encoder.command after module rename.
pymongo/synchronous/pool.py Updates import to use command_encoder.command after module rename.
Comments suppressed due to low confidence (1)

pymongo/synchronous/command_encoder.py:21

  • Docstring references pymongo.command_runner.run_command, but there is no top-level pymongo.command_runner module in this codebase. Since this file imports run_command from pymongo.synchronous.command_runner, the Sphinx reference should match to avoid broken cross-references / confusion.

if bwc.publish:
bwc._start(cmd, request_id, op_docs, ns_docs)
try:
reply = bwc.conn.write_command(request_id, msg, bwc.codec) # type: ignore[misc, arg-type]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AsyncConnection.write_command doesn't appear to have any remaining callers, so it (and its sync counterpart) are dead code that should be removed.

result = await bwc.conn.unack_write(msg, max_doc_size) # type: ignore[func-returns-value, misc, override]
duration = datetime.datetime.now() - bwc.start_time
if result is not None:
reply = _convert_write_result(bwc.name, cmd, result) # type: ignore[arg-type]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_convert_write_result is also dead code now.

serviceId=bwc.conn.service_id,
)
if bwc.publish:
bwc._start(cmd, request_id, docs)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_BulkWriteContext._start, _BulkWriteContext._succeed, _BulkWriteContext._fail, and _ClientBulkWriteContext._start are all dead code.

Comment thread pymongo/asynchronous/command_runner.py Outdated
_IS_SYNC = False


async def run_command(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This number of kwargs, several of which directly gate which network behavior to use, is extremely hard to parse and makes every call site much more complex than before. What if we split those different paths into three public methods that all wrap a private _run_command: one for acknowledged commands, one for unacknowledged commands, and one for cursor commands? That way we still have one unified command execution point, but multiple entry points that don't require a huge list of kwargs on every invocation.

Comment thread pymongo/asynchronous/bulk.py Outdated
decrypt_reply=False,
)
reply = result_docs[0]
# Process the response from the server.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should pass process_response=True and remove this line.

Comment thread pymongo/asynchronous/client_bulk.py Outdated
decrypt_reply=False,
)
reply = result_docs[0]
# Process the response from the server.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

Comment thread pymongo/asynchronous/client_bulk.py Outdated
published = dict(cmd)
published["ops"] = op_docs
published["nsInfo"] = ns_docs
reply: Mapping[str, Any] = {"ok": 1}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use the response from run_unacknowledged_command, right?

Comment thread pymongo/asynchronous/pool.py Outdated

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this method (and _raise_if_not_writable) should be moved to the command runner logic.

Comment thread pymongo/asynchronous/bulk.py Outdated
if bwc.publish:
bwc._fail(request_id, failure, duration)
# Process the response from the server.
if isinstance(exc, (NotPrimaryError, OperationFailure)):

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this also be handled by run_acknowledged_command?

Comment thread pymongo/asynchronous/client_bulk.py Outdated
# Top-level error will be embedded in ClientBulkWriteException.
reply = {"error": exc}
# Process the response from the server.
if isinstance(exc, OperationFailure):

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question

Comment thread pymongo/asynchronous/command_runner.py Outdated
reply = await conn.receive_message(None)
elif unacknowledged:
if use_conn_transport:
if not conn.is_writable:

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, now this is defined twice. Maybe it should be a public method on conn.

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 13 out of 13 changed files in this pull request and generated 2 comments.

Comments suppressed due to low confidence (1)

pymongo/synchronous/command_encoder.py:22

  • The module reference in this docstring points to pymongo.command_runner, but there is no top-level pymongo/command_runner.py in this codebase. This will produce incorrect Sphinx cross-references; it should point at the synchronous runner module.

Comment thread pymongo/asynchronous/bulk.py Outdated
Comment thread pymongo/asynchronous/client_bulk.py Outdated
Comment on lines +241 to +245
try:
reply = await bwc.conn.write_command(request_id, msg, bwc.codec) # type: ignore[misc, arg-type]
duration = datetime.datetime.now() - bwc.start_time
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.SUCCEEDED,
clientId=client._topology_settings._topology_id,
durationMS=duration,
reply=reply,
commandName=next(iter(cmd)),
databaseName=bwc.db_name,
requestId=request_id,
operationId=request_id,
driverConnectionId=bwc.conn.id,
serverConnectionId=bwc.conn.server_connection_id,
serverHost=bwc.conn.address[0],
serverPort=bwc.conn.address[1],
serviceId=bwc.conn.service_id,
)
if bwc.publish:
bwc._succeed(request_id, reply, duration) # type: ignore[arg-type]
# Process the response from the server.
await self.client._process_response(reply, bwc.session) # type: ignore[arg-type]
result_docs, _, _ = await run_acknowledged_command(
bwc.conn, # type: ignore[arg-type]
cmd,
bwc.db_name,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These seem worth investigating. We may need to put _process_response back at the call site, but hopefully not.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a big deal if we need to but then the telemetry consolidation will resolve it?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think they're orthogonal concerns.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The last commit only added a comment, I don't think it addressed this feedback

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, we either accept changing the ordering or we put _process_response back at the call site? Can we determine if changing the ordering matters? I would say let's go ahead, but admittedly I don't understand the full context.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ordering doesn't bother me as much as the other comment about including the special error gossip of {} vs exc.details when swallowing exceptions, given that the tests still pass.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ordering doesn't bother me as much as the other comment about including the special error gossip of {} vs exc.details when swallowing exceptions, given that the tests still pass.

@NoahStapp suggested we check the spec and the appropriate spec appears to be the driver sessions spec, specifically:

Whenever a driver receives a cluster time from a server it MUST compare it to the current highest seen cluster time for
the deployment. If the new cluster time is higher than the highest seen cluster time it MUST become the new highest seen
cluster time. Two cluster times are compared using only the BsonTimestamp value of the clusterTime embedded field (be
sure to include both the timestamp and the increment of the BsonTimestamp in the comparison). The signature field does
not participate in the comparison.

Since run_acknowledged_command calls _process_response(response_doc) before it calls _check_command_response, in an OperationFailure (a server error response that carries $clusterTime), gossip already happened inside run_acknowledged_command before the exception propagated to the except block. By the time client_bulk catches it, the spec requirement "MUST compare it to the current highest seen cluster time" is already satisfied. For non-server errors like network failures there's no response document at all, so there's no $clusterTime to gossip — the original _process_response({}) was also a no-op in that case. So I think it's OK to leave it as is.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay sounds good, I say merge and iterate.

@codecov-commenter

codecov-commenter commented Jun 10, 2026

Copy link
Copy Markdown

@blink1073

Copy link
Copy Markdown
Member

There are some merge conflicts

@aclark4life

Copy link
Copy Markdown
Contributor Author

There are some merge conflicts

rebased

Comment thread pymongo/asynchronous/pool.py Outdated
blink1073
blink1073 previously approved these changes Jun 11, 2026
@aclark4life

Copy link
Copy Markdown
Contributor Author

@blink1073 Thanks! Can you merge? I'm blocked because I committed last … or something.

@blink1073

Copy link
Copy Markdown
Member

@NoahStapp has a requested change, are you good to merge?

Comment thread pymongo/asynchronous/command_encoder.py Outdated

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we actually need this as a separate file from command_runner.py? It contains only one method, and that method just calls methods from command_runner.py after some processing.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly not … I combined them so we'll find out.

Comment thread pymongo/asynchronous/server.py Outdated
Comment on lines +175 to +185
return docs[0]
elif operation.name == "explain":
return docs[0] if docs else {}
res: dict[str, Any] = {
"cursor": {"id": reply.cursor_id, "ns": operation.namespace()}, # type: ignore[union-attr]
"ok": 1,
}
if operation.name == "find":
res["cursor"]["firstBatch"] = docs
else:
failure = _convert_exception(exc)
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.FAILED,
clientId=client._topology_settings._topology_id,
durationMS=duration,
failure=failure,
commandName=next(iter(cmd)),
databaseName=dbn,
requestId=request_id,
operationId=request_id,
driverConnectionId=conn.id,
serverConnectionId=conn.server_connection_id,
serverHost=conn.address[0],
serverPort=conn.address[1],
serviceId=conn.service_id,
isServerSideError=isinstance(exc, OperationFailure),
)
if publish:
assert listeners is not None
listeners.publish_command_failure(
duration,
failure,
operation.name,
request_id,
conn.address,
conn.server_connection_id,
service_id=conn.service_id,
database_name=dbn,
)
raise
duration = datetime.now() - start
# Must publish in find / getMore / explain command response
# format.
res = docs[0]
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.SUCCEEDED,
clientId=client._topology_settings._topology_id,
durationMS=duration,
reply=res,
commandName=next(iter(cmd)),
databaseName=dbn,
requestId=request_id,
operationId=request_id,
driverConnectionId=conn.id,
serverConnectionId=conn.server_connection_id,
serverHost=conn.address[0],
serverPort=conn.address[1],
serviceId=conn.service_id,
)
if publish:
assert listeners is not None
listeners.publish_command_success(
duration,
res,
operation.name,
request_id,
conn.address,
conn.server_connection_id,
service_id=conn.service_id,
database_name=dbn,
)

# Decrypt response.
client = operation.client # type: ignore[assignment]
if client and client._encrypter:
if use_cmd:
decrypted = await client._encrypter.decrypt(reply.raw_command_response())
docs = _decode_all_selective(decrypted, operation.codec_options, user_fields)
res["cursor"]["nextBatch"] = docs

@NoahStapp NoahStapp Jun 12, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This block is a regression from master.

Comment thread pymongo/message.py Outdated

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.publish is dead code that we can remove.

Comment thread pymongo/message.py Outdated

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dead code, can be removed.

Comment thread pymongo/asynchronous/bulk.py Outdated
op_id=bwc.op_id,
command_name=bwc.name,
use_conn_transport=True,
decrypt_reply=False,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should have set_conn_more_to_come=False for clarity. The default value for run_acknowledged_command is True.

Comment thread pymongo/asynchronous/command_runner.py Outdated
max_doc_size: int = 0,
more_to_come: bool = False,
set_conn_more_to_come: bool = True,
is_command_response: bool = True,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dead code now that PYTHON-5713 is done.

Comment thread pymongo/asynchronous/command_runner.py Outdated
reply,
cursor_id,
codec_options,
legacy_response=not is_command_response,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PYTHON-5713 also makes legacy_response dead code.

aclark4life and others added 12 commits June 15, 2026 16:08
After the consolidation, this module no longer does any networking -- the
send/receive round trip moved into command_runner.run_command. It now only
encodes a command and runs its pre-flight (read preference/concern, collation,
$clusterTime, auto-encryption, CSOT, OP_MSG encoding), so 'network' was
misleading and collided with the lower-level network_layer.py (raw sockets).

Pure rename: git mv the async module (synchro regenerates the sync mirror) and
update the two pool.py imports. No behavior change.
PYTHON-5713 removed _OpReply from pymongo/message.py since OP_MSG is
now used exclusively. Update command_runner.py and server.py to drop
the _OpReply import and simplify Union[_OpReply, _OpMsg] → _OpMsg in
all type annotations. Regenerate sync mirrors via synchro.py.
…rocess_response calls

- Add _raise_if_not_writable() to AsyncConnection (and sync mirror);
  use it in both pool.write() and command_runner._run_command() so the
  NotPrimaryError logic lives in one place.
- bulk.run(): remove the try/except that manually called
  _process_response on error — run_acknowledged_command already handles
  it via process_response=True.
- client_bulk.run(): same — remove the _process_response calls from the
  except block; the {"error": exc} wrapping for ClientBulkWriteException
  is kept.
- Merge command_encoder.py into command_runner.py; update pool.py import
- Fix _build_reply_doc regression in server.py: always return docs[0]
- Remove dead code: is_command_response param from _run_command/run_cursor_command
- Remove dead code: legacy_response passthrough in unpack_res call
- Remove dead code: self.publish from _BulkWriteContextBase
- Add explicit set_conn_more_to_come=False to bulk write_command call

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 13 out of 13 changed files in this pull request and generated 2 comments.

Comments suppressed due to low confidence (1)

pymongo/synchronous/command_encoder.py:22

  • The module docstring references pymongo.command_runner.run_acknowledged_command, but there is no pymongo.command_runner module in this codebase. This Sphinx cross-reference will be broken; it should point at the synchronous runner module instead.

Comment thread pymongo/asynchronous/command_runner.py
Comment thread pymongo/synchronous/command_runner.py
@aclark4life

Copy link
Copy Markdown
Contributor Author

@NoahStapp @blink1073 GTG here I think …

@aclark4life aclark4life requested a review from Copilot June 16, 2026 15:10

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file shouldn't exist since it's the synchronous version of the deleted pymongo/asynchronous/network.py.

@blink1073 blink1073 Jun 16, 2026

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean to make this comment on command_encoder.py? I would expect network.py to be deleted in both, and command_runner.py added to both. If you agree, I'll also see about forcing git to recognize the link between the old and new files.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, sorry.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we'd have to do a rebase-merge to preserve the link between the files.

Comment thread pymongo/asynchronous/server.py Outdated
docs = _decode_all_selective(decrypted, operation.codec_options, user_fields)
user_fields = _CURSOR_DOC_FIELDS if use_cmd else None

def _build_reply_doc(docs: list[dict[str, Any]], reply: Optional[_OpMsg]) -> _DocumentOut: # noqa: ARG001

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function can be removed entirely: I don't see another value passed to reply_doc_builder, can't we just inline the docs[0] itself within run_cursor_command?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment thread pymongo/asynchronous/command_runner.py Outdated
return docs, reply, duration


async def run_acknowledged_command(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This and run_unacknowledged_command are nearly identical besides a few kwargs and which fixed kwargs they pass through to _run_command. Will the logging/monitoring consolidation refactor allow us to either make these more distinct or reduce duplication? Or should we figure out a way to do that here in this PR?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This and run_unacknowledged_command are nearly identical besides a few kwargs and which fixed kwargs they pass through to _run_command. Will the logging/monitoring consolidation refactor allow us to either make these more distinct or reduce duplication? Or should we figure out a way to do that here in this PR?

Those were created in response to this, so I'm fine with leaving them for now. Handing off to @blink1073 !

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current code doesn't resolve my earlier comment that you linked: we still have a huge list of kwargs that's very difficult to parse and makes every call site much more complex than the pre-consolidation code.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed a refactor, but I still want to look over it in context it before re-requesting review.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I'm happy with the current state with run_command, run_bulk_write_command and run_cursor_command. I've minimized the amount of kwargs needed as much as possible. run_command inherits the large number of kwargs in AsyncConnection.command.

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 13 out of 13 changed files in this pull request and generated 4 comments.

Comments suppressed due to low confidence (1)

pymongo/synchronous/command_encoder.py:22

  • The Sphinx cross-reference in this docstring points to pymongo.command_runner, but there is no top-level pymongo/command_runner.py in this codebase. This will render as a broken reference; it should point at the synchronous runner module that actually provides run_acknowledged_command.

Comment thread pymongo/synchronous/server.py
Comment thread pymongo/asynchronous/server.py
Comment thread pymongo/synchronous/bulk.py Outdated
Comment thread pymongo/asynchronous/bulk.py Outdated
Conflicts resolved:
- bulk.py / client_bulk.py (async + sync): dropped datetime/logging imports
  removed by our refactor; kept Iterator/Mapping imports added upstream
- message.py: kept our removal of dead _convert_write_result and publish slot;
  kept upstream's alphabetized __slots__ and new codec slot
- network.py (modify/delete): kept our deletion (absorbed into command_runner.py)
- command_encoder.py (sync rename/delete): kept our deletion
- command_runner.py: applied upstream's collections.abc import modernization
  from network.py
Remove transport-selection kwargs (use_conn_transport, decrypt_reply,
set_conn_more_to_come) from the bulk write call sites by introducing
run_bulk_write_command, which hardcodes the correct values for the bulk
write path.
…ed/unacknowledged_command

The two wrapper functions were only ever called from command() in the same
file. Collapse the if/else into a single run_command call using
process_response=not unacknowledged and decrypt_reply=not unacknowledged.
…te_command

Define a Protocol in command_runner.py capturing the fields that bulk write
contexts supply (conn, db_name, session, listeners, start_time, codec, op_id,
name). run_bulk_write_command now takes a bwc context object plus the four
per-call arguments (cmd, request_id, msg, client), reducing call sites from
~13 arguments to ~5.
…lic wrappers

_run_command holds all transport/response flags and is called only by the
three public entry points. run_command exposes a clean interface for the
standard network-transport path (deriving process_response and decrypt_reply
from unacknowledged), run_bulk_write_command and run_cursor_command call
_run_command directly with their own hardcoded flags.
Remove the thin run_command wrapper and rename command() to run_command()
so there is a single public entry point for encoding and executing commands.
pool.py updated to import and call run_command instead of command.
@blink1073 blink1073 requested a review from NoahStapp June 16, 2026 23:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants