diff --git a/pymongo/asynchronous/bulk.py b/pymongo/asynchronous/bulk.py index 4a54f9eb3f..1ff6475673 100644 --- a/pymongo/asynchronous/bulk.py +++ b/pymongo/asynchronous/bulk.py @@ -19,8 +19,6 @@ from __future__ import annotations import copy -import datetime -import logging from collections.abc import MutableMapping from itertools import islice from typing import ( @@ -57,18 +55,17 @@ OperationFailure, ) from pymongo.helpers_shared import _RETRYABLE_ERROR_CODES -from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log from pymongo.message import ( _DELETE, _INSERT, _UPDATE, _BulkWriteContext, - _convert_exception, _convert_write_result, _EncryptedBulkWriteContext, _randint, ) from pymongo.read_preferences import ReadPreference +from pymongo.telemetry import command_telemetry from pymongo.write_concern import WriteConcern if TYPE_CHECKING: @@ -252,78 +249,31 @@ async def write_command( ) -> dict[str, Any]: """A proxy for SocketInfo.write_command that handles event publishing.""" cmd[bwc.field] = docs - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.STARTED, - clientId=client._topology_settings._topology_id, - command=cmd, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) - if bwc.publish: - bwc._start(cmd, request_id, docs) - try: - reply = await bwc.conn.write_command(request_id, msg, bwc.codec) # type: ignore[misc] - duration = datetime.datetime.now() - bwc.start_time - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.SUCCEEDED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - reply=reply, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) - if bwc.publish: - bwc._succeed(request_id, reply, duration) # type: ignore[arg-type] - await client._process_response(reply, bwc.session) # type: ignore[arg-type] - except Exception as exc: - duration = datetime.datetime.now() - bwc.start_time - if isinstance(exc, (NotPrimaryError, OperationFailure)): - failure: _DocumentOut = exc.details # type: ignore[assignment] - else: - failure = _convert_exception(exc) - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.FAILED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - failure=failure, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - isServerSideError=isinstance(exc, OperationFailure), - ) - - if bwc.publish: - bwc._fail(request_id, failure, duration) - # Process the response from the server. - if isinstance(exc, (NotPrimaryError, OperationFailure)): - await client._process_response(exc.details, bwc.session) # type: ignore[arg-type] - raise + with command_telemetry( + command_name=bwc.name, + database_name=bwc.db_name, + spec=cmd, + driver_connection_id=bwc.conn.id, + server_connection_id=bwc.conn.server_connection_id, + publish_event=bwc.publish, + start_time=bwc.start_time, + address=bwc.conn.address, + listeners=bwc.listeners, + client=client, + request_id=request_id, + service_id=bwc.conn.service_id, + operation_id=bwc.op_id, + ) as telemetry: + try: + reply = await bwc.conn.write_command(request_id, msg, bwc.codec) # type: ignore[misc] + telemetry.publish_succeeded(reply) # type: ignore[misc, arg-type] + await client._process_response(reply, bwc.session) # type: ignore[arg-type] + except Exception as exc: + telemetry.publish_failed(exc) + # Process the response from the server. + if isinstance(exc, (NotPrimaryError, OperationFailure)): + await client._process_response(exc.details, bwc.session) # type: ignore[arg-type] + raise return reply # type: ignore[return-value] async def unack_write( @@ -337,81 +287,33 @@ async def unack_write( client: AsyncMongoClient[Any], ) -> Optional[Mapping[str, Any]]: """A proxy for AsyncConnection.unack_write that handles event publishing.""" - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.STARTED, - clientId=client._topology_settings._topology_id, - command=cmd, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) - if bwc.publish: - cmd = bwc._start(cmd, request_id, docs) - try: - result = await bwc.conn.unack_write(msg, max_doc_size) # type: ignore[func-returns-value, misc, override] - duration = datetime.datetime.now() - bwc.start_time - if result is not None: - reply = _convert_write_result(bwc.name, cmd, result) # type: ignore[arg-type] - else: - # Comply with APM spec. - reply = {"ok": 1} - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.SUCCEEDED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - reply=reply, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) - if bwc.publish: - bwc._succeed(request_id, reply, duration) - except Exception as exc: - duration = datetime.datetime.now() - bwc.start_time - if isinstance(exc, OperationFailure): - failure: _DocumentOut = _convert_write_result(bwc.name, cmd, exc.details) # type: ignore[arg-type] - elif isinstance(exc, NotPrimaryError): - failure = exc.details # type: ignore[assignment] - else: - failure = _convert_exception(exc) - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.FAILED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - failure=failure, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - isServerSideError=isinstance(exc, OperationFailure), - ) - if bwc.publish: - assert bwc.start_time is not None - bwc._fail(request_id, failure, duration) - raise + cmd[bwc.field] = docs + with command_telemetry( + command_name=bwc.name, + database_name=bwc.db_name, + spec=cmd, + driver_connection_id=bwc.conn.id, + server_connection_id=bwc.conn.server_connection_id, + publish_event=bwc.publish, + start_time=bwc.start_time, + address=bwc.conn.address, + listeners=bwc.listeners, + client=client, + request_id=request_id, + service_id=bwc.conn.service_id, + operation_id=bwc.op_id, + ) as telemetry: + try: + result = await bwc.conn.unack_write(msg, max_doc_size) # type: ignore[func-returns-value, misc, override] + if result is not None: + reply = _convert_write_result(bwc.name, cmd, result) # type: ignore[arg-type] + else: + # Comply with APM spec. + reply = {"ok": 1} + telemetry.publish_succeeded(reply) # type: ignore[misc, arg-type] + except Exception as exc: + telemetry.publish_failed(exc) + raise return result # type: ignore[return-value] async def _execute_batch_unack( diff --git a/pymongo/asynchronous/client_bulk.py b/pymongo/asynchronous/client_bulk.py index 151942c8a8..d978599c24 100644 --- a/pymongo/asynchronous/client_bulk.py +++ b/pymongo/asynchronous/client_bulk.py @@ -19,8 +19,6 @@ from __future__ import annotations import copy -import datetime -import logging from collections.abc import MutableMapping from itertools import islice from typing import ( @@ -62,11 +60,9 @@ WaitQueueTimeoutError, ) from pymongo.helpers_shared import _RETRYABLE_ERROR_CODES -from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log from pymongo.message import ( _ClientBulkWriteContext, _convert_client_bulk_exception, - _convert_exception, _convert_write_result, _randint, ) @@ -77,6 +73,7 @@ InsertOneResult, UpdateResult, ) +from pymongo.telemetry import command_telemetry from pymongo.typings import _DocumentOut, _Pipeline from pymongo.write_concern import WriteConcern @@ -238,82 +235,35 @@ async def write_command( """A proxy for AsyncConnection.write_command that handles event publishing.""" cmd["ops"] = op_docs cmd["nsInfo"] = ns_docs - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.STARTED, - clientId=client._topology_settings._topology_id, - command=cmd, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) - if bwc.publish: - bwc._start(cmd, request_id, op_docs, ns_docs) - try: - reply = await bwc.conn.write_command(request_id, msg, bwc.codec) # type: ignore[misc, arg-type] - duration = datetime.datetime.now() - bwc.start_time - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.SUCCEEDED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - reply=reply, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) - if bwc.publish: - bwc._succeed(request_id, reply, duration) # type: ignore[arg-type] - # Process the response from the server. - await self.client._process_response(reply, bwc.session) # type: ignore[arg-type] - except Exception as exc: - duration = datetime.datetime.now() - bwc.start_time - if isinstance(exc, (NotPrimaryError, OperationFailure)): - failure: _DocumentOut = exc.details # type: ignore[assignment] - else: - failure = _convert_exception(exc) - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.FAILED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - failure=failure, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - isServerSideError=isinstance(exc, OperationFailure), - ) - - if bwc.publish: - bwc._fail(request_id, failure, duration) - # Top-level error will be embedded in ClientBulkWriteException. - reply = {"error": exc} - # Process the response from the server. - if isinstance(exc, OperationFailure): - await self.client._process_response(exc.details, bwc.session) # type: ignore[arg-type] - else: - await self.client._process_response({}, bwc.session) # type: ignore[arg-type] + with command_telemetry( + command_name=bwc.name, + database_name=bwc.db_name, + spec=cmd, + driver_connection_id=bwc.conn.id, + server_connection_id=bwc.conn.server_connection_id, + publish_event=bwc.publish, + start_time=bwc.start_time, + address=bwc.conn.address, + listeners=bwc.listeners, + client=client, + request_id=request_id, + service_id=bwc.conn.service_id, + operation_id=bwc.op_id, + ) as telemetry: + try: + reply = await bwc.conn.write_command(request_id, msg, bwc.codec) # type: ignore[misc, arg-type] + telemetry.publish_succeeded(reply) # type: ignore[misc, arg-type] + # Process the response from the server. + await self.client._process_response(reply, bwc.session) # type: ignore[arg-type] + except Exception as exc: + telemetry.publish_failed(exc) + # Top-level error will be embedded in ClientBulkWriteException. + reply = {"error": exc} + # Process the response from the server. + if isinstance(exc, OperationFailure): + await self.client._process_response(exc.details, bwc.session) # type: ignore[arg-type] + else: + await self.client._process_response({}, bwc.session) # type: ignore[arg-type] return reply # type: ignore[return-value] async def unack_write( @@ -327,82 +277,35 @@ async def unack_write( client: AsyncMongoClient[Any], ) -> Optional[Mapping[str, Any]]: """A proxy for AsyncConnection.unack_write that handles event publishing.""" - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.STARTED, - clientId=client._topology_settings._topology_id, - command=cmd, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) - if bwc.publish: - cmd = bwc._start(cmd, request_id, op_docs, ns_docs) - try: - result = await bwc.conn.unack_write(msg, bwc.max_bson_size) # type: ignore[func-returns-value, misc, override] - duration = datetime.datetime.now() - bwc.start_time - if result is not None: - reply = _convert_write_result(bwc.name, cmd, result) # type: ignore[arg-type] - else: - # Comply with APM spec. - reply = {"ok": 1} - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.SUCCEEDED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - reply=reply, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) - if bwc.publish: - bwc._succeed(request_id, reply, duration) - except Exception as exc: - duration = datetime.datetime.now() - bwc.start_time - if isinstance(exc, OperationFailure): - failure: _DocumentOut = _convert_write_result(bwc.name, cmd, exc.details) # type: ignore[arg-type] - elif isinstance(exc, NotPrimaryError): - failure = exc.details # type: ignore[assignment] - else: - failure = _convert_exception(exc) - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.FAILED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - failure=failure, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - isServerSideError=isinstance(exc, OperationFailure), - ) - if bwc.publish: - assert bwc.start_time is not None - bwc._fail(request_id, failure, duration) - # Top-level error will be embedded in ClientBulkWriteException. - reply = {"error": exc} + cmd["ops"] = op_docs + cmd["nsInfo"] = ns_docs + with command_telemetry( + command_name=bwc.name, + database_name=bwc.db_name, + spec=cmd, + driver_connection_id=bwc.conn.id, + server_connection_id=bwc.conn.server_connection_id, + publish_event=bwc.publish, + start_time=bwc.start_time, + address=bwc.conn.address, + listeners=bwc.listeners, + client=client, + request_id=request_id, + service_id=bwc.conn.service_id, + operation_id=bwc.op_id, + ) as telemetry: + try: + result = await bwc.conn.unack_write(msg, bwc.max_bson_size) # type: ignore[func-returns-value, misc, override] + if result is not None: + reply = _convert_write_result(bwc.name, cmd, result) # type: ignore[arg-type] + else: + # Comply with APM spec. + reply = {"ok": 1} + telemetry.publish_succeeded(reply) # type: ignore[misc, arg-type] + except Exception as exc: + telemetry.publish_failed(exc) + # Top-level error will be embedded in ClientBulkWriteException. + reply = {"error": exc} return reply async def _execute_batch_unack( diff --git a/pymongo/asynchronous/network.py b/pymongo/asynchronous/network.py index 5a5dc7fa2c..d5aa98f3cb 100644 --- a/pymongo/asynchronous/network.py +++ b/pymongo/asynchronous/network.py @@ -16,7 +16,6 @@ from __future__ import annotations import datetime -import logging from typing import ( TYPE_CHECKING, Any, @@ -31,17 +30,13 @@ from bson import _decode_all_selective from pymongo import _csot, helpers_shared, message from pymongo.compression_support import _NO_COMPRESSION -from pymongo.errors import ( - NotPrimaryError, - OperationFailure, -) -from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log from pymongo.message import _OpMsg from pymongo.monitoring import _is_speculative_authenticate from pymongo.network_layer import ( async_receive_message, async_sendall, ) +from pymongo.telemetry import command_telemetry if TYPE_CHECKING: from bson import CodecOptions @@ -159,140 +154,62 @@ async def command( if max_bson_size is not None and size > max_bson_size + message._COMMAND_OVERHEAD: message._raise_document_too_large(name, size, max_bson_size + message._COMMAND_OVERHEAD) - if client is not None: - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.STARTED, - clientId=client._topology_settings._topology_id, - command=spec, - commandName=next(iter(spec)), - databaseName=dbname, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - ) - if publish: - assert listeners is not None - assert address is not None - listeners.publish_command_start( - orig, - dbname, - request_id, - address, - conn.server_connection_id, - service_id=conn.service_id, - ) - try: - await async_sendall(conn.conn.get_conn, msg) - if use_op_msg and unacknowledged: - # Unacknowledged, fake a successful command response. - reply = None - response_doc: _DocumentOut = {"ok": 1} - else: - reply = await async_receive_message(conn, request_id) - conn.more_to_come = reply.more_to_come - unpacked_docs = reply.unpack_response( - codec_options=codec_options, user_fields=user_fields - ) - - response_doc = unpacked_docs[0] - if not conn.ready: - cluster_time = response_doc.get("$clusterTime") - if cluster_time: - conn._cluster_time = cluster_time - if client: - await client._process_response(response_doc, session) - if check: - helpers_shared._check_command_response( - response_doc, - conn.max_wire_version, - allowable_errors, - parse_write_concern_error=parse_write_concern_error, - ) - except Exception as exc: - duration = datetime.datetime.now() - start - if isinstance(exc, (NotPrimaryError, OperationFailure)): - failure: _DocumentOut = exc.details # type: ignore[assignment] - else: - failure = message._convert_exception(exc) - if client is not None: - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.FAILED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - failure=failure, - commandName=next(iter(spec)), - databaseName=dbname, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - isServerSideError=isinstance(exc, OperationFailure), + with command_telemetry( + command_name=name, + database_name=dbname, + spec=spec, + address=address if address else conn.address, + driver_connection_id=conn.id, + server_connection_id=conn.server_connection_id, + publish_event=publish, + start_time=start, + client=client, + listeners=listeners, + request_id=request_id, + service_id=conn.service_id, + ) as telemetry: + try: + await async_sendall(conn.conn.get_conn, msg) + if use_op_msg and unacknowledged: + # Unacknowledged, fake a successful command response. + reply = None + response_doc: _DocumentOut = {"ok": 1} + else: + reply = await async_receive_message(conn, request_id) + conn.more_to_come = reply.more_to_come + unpacked_docs = reply.unpack_response( + codec_options=codec_options, user_fields=user_fields ) - if publish: - assert listeners is not None - assert address is not None - listeners.publish_command_failure( - duration, - failure, - name, - request_id, - address, - conn.server_connection_id, - service_id=conn.service_id, - database_name=dbname, - ) - raise - duration = datetime.datetime.now() - start - if client is not None: - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.SUCCEEDED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - reply=response_doc, - commandName=next(iter(spec)), - databaseName=dbname, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - speculative_authenticate="speculativeAuthenticate" in orig, - ) - if publish: - assert listeners is not None - assert address is not None - listeners.publish_command_success( - duration, - response_doc, - name, - request_id, - address, - conn.server_connection_id, - service_id=conn.service_id, + + response_doc = unpacked_docs[0] + if not conn.ready: + cluster_time = response_doc.get("$clusterTime") + if cluster_time: + conn._cluster_time = cluster_time + if client: + await client._process_response(response_doc, session) + if check: + helpers_shared._check_command_response( + response_doc, + conn.max_wire_version, + allowable_errors, + parse_write_concern_error=parse_write_concern_error, + ) + except Exception as exc: + telemetry.publish_failed(exc) + raise + + telemetry.publish_succeeded( + reply=response_doc, speculative_hello=speculative_hello, - database_name=dbname, + speculative_authenticate="speculativeAuthenticate" in orig, ) - if client and client._encrypter and reply: - decrypted = await client._encrypter.decrypt(reply.raw_command_response()) - response_doc = cast( - "_DocumentOut", _decode_all_selective(decrypted, codec_options, user_fields)[0] - ) + if client and client._encrypter and reply: + decrypted = await client._encrypter.decrypt(reply.raw_command_response()) + response_doc = cast( + "_DocumentOut", _decode_all_selective(decrypted, codec_options, user_fields)[0] + ) - return response_doc # type: ignore[return-value] + return response_doc # type: ignore[return-value] diff --git a/pymongo/asynchronous/server.py b/pymongo/asynchronous/server.py index f212306174..2fc0c13458 100644 --- a/pymongo/asynchronous/server.py +++ b/pymongo/asynchronous/server.py @@ -28,17 +28,15 @@ from bson import _decode_all_selective from pymongo.asynchronous.helpers import _handle_reauth -from pymongo.errors import NotPrimaryError, OperationFailure from pymongo.helpers_shared import _check_command_response from pymongo.logger import ( - _COMMAND_LOGGER, _SDAM_LOGGER, - _CommandStatusMessage, _debug_log, _SDAMStatusMessage, ) -from pymongo.message import _convert_exception, _GetMore, _OpMsg, _Query +from pymongo.message import _GetMore, _OpMsg, _Query from pymongo.response import PinnedResponse, Response +from pymongo.telemetry import command_telemetry if TYPE_CHECKING: from queue import Queue @@ -170,140 +168,66 @@ async def run_operation( message = operation.get_message(read_preference, conn, use_cmd) request_id, data, max_doc_size = self._split_message(message) - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.STARTED, - clientId=client._topology_settings._topology_id, - command=cmd, - commandName=next(iter(cmd)), - databaseName=dbn, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - ) - - if publish: - if "$db" not in cmd: - cmd["$db"] = dbn - assert listeners is not None - listeners.publish_command_start( - cmd, - dbn, - request_id, - conn.address, - conn.server_connection_id, - service_id=conn.service_id, - ) - - try: - if more_to_come: - reply = await conn.receive_message(None) - else: - await conn.send_message(data, max_doc_size) - reply = await conn.receive_message(request_id) - - # Unpack and check for command errors. - if use_cmd: - user_fields = _CURSOR_DOC_FIELDS - legacy_response = False - else: - user_fields = None - legacy_response = True - docs = unpack_res( - reply, - operation.cursor_id, - operation.codec_options, - legacy_response=legacy_response, - user_fields=user_fields, - ) + if publish and "$db" not in cmd: + cmd["$db"] = dbn + + with command_telemetry( + command_name=operation.name, + database_name=dbn, + spec=cmd, + driver_connection_id=conn.id, + server_connection_id=conn.server_connection_id, + publish_event=publish, + start_time=start, + address=conn.address, + listeners=listeners, + client=client, + request_id=request_id, + service_id=conn.service_id, + ) as telemetry: + try: + if more_to_come: + reply = await conn.receive_message(None) + else: + await conn.send_message(data, max_doc_size) + reply = await conn.receive_message(request_id) + + # Unpack and check for command errors. + if use_cmd: + user_fields = _CURSOR_DOC_FIELDS + legacy_response = False + else: + user_fields = None + legacy_response = True + docs = unpack_res( + reply, + operation.cursor_id, + operation.codec_options, + legacy_response=legacy_response, + user_fields=user_fields, + ) + if use_cmd: + first = docs[0] + await operation.client._process_response(first, operation.session) # type: ignore[misc, arg-type] + _check_command_response(first, conn.max_wire_version, pool_opts=conn.opts) # type:ignore[has-type] + except Exception as exc: + telemetry.publish_failed(exc) + raise + + # Must publish in find / getMore / explain command response format. if use_cmd: - first = docs[0] - await operation.client._process_response(first, operation.session) # type: ignore[misc, arg-type] - _check_command_response(first, conn.max_wire_version, pool_opts=conn.opts) # type:ignore[has-type] - except Exception as exc: - duration = datetime.now() - start - if isinstance(exc, (NotPrimaryError, OperationFailure)): - failure: _DocumentOut = exc.details # type: ignore[assignment] + res = docs[0] + elif operation.name == "explain": + res = docs[0] if docs else {} else: - failure = _convert_exception(exc) - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.FAILED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - failure=failure, - commandName=next(iter(cmd)), - databaseName=dbn, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - isServerSideError=isinstance(exc, OperationFailure), - ) - if publish: - assert listeners is not None - listeners.publish_command_failure( - duration, - failure, - operation.name, - request_id, - conn.address, - conn.server_connection_id, - service_id=conn.service_id, - database_name=dbn, - ) - raise + res = {"cursor": {"id": reply.cursor_id, "ns": operation.namespace()}, "ok": 1} # type: ignore[union-attr] + if operation.name == "find": + res["cursor"]["firstBatch"] = docs + else: + res["cursor"]["nextBatch"] = docs + telemetry.publish_succeeded(res) + duration = datetime.now() - start - # Must publish in find / getMore / explain command response - # format. - if use_cmd: - res = docs[0] - elif operation.name == "explain": - res = docs[0] if docs else {} - else: - res = {"cursor": {"id": reply.cursor_id, "ns": operation.namespace()}, "ok": 1} # type: ignore[union-attr] - if operation.name == "find": - res["cursor"]["firstBatch"] = docs - else: - res["cursor"]["nextBatch"] = docs - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.SUCCEEDED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - reply=res, - commandName=next(iter(cmd)), - databaseName=dbn, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - ) - if publish: - assert listeners is not None - listeners.publish_command_success( - duration, - res, - operation.name, - request_id, - conn.address, - conn.server_connection_id, - service_id=conn.service_id, - database_name=dbn, - ) # Decrypt response. client = operation.client # type: ignore[assignment] diff --git a/pymongo/synchronous/bulk.py b/pymongo/synchronous/bulk.py index 22d6a7a76a..1fe6636646 100644 --- a/pymongo/synchronous/bulk.py +++ b/pymongo/synchronous/bulk.py @@ -19,8 +19,6 @@ from __future__ import annotations import copy -import datetime -import logging from collections.abc import MutableMapping from itertools import islice from typing import ( @@ -55,13 +53,11 @@ OperationFailure, ) from pymongo.helpers_shared import _RETRYABLE_ERROR_CODES -from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log from pymongo.message import ( _DELETE, _INSERT, _UPDATE, _BulkWriteContext, - _convert_exception, _convert_write_result, _EncryptedBulkWriteContext, _randint, @@ -69,6 +65,7 @@ from pymongo.read_preferences import ReadPreference from pymongo.synchronous.client_session import ClientSession, _validate_session_write_concern from pymongo.synchronous.helpers import _handle_reauth +from pymongo.telemetry import command_telemetry from pymongo.write_concern import WriteConcern if TYPE_CHECKING: @@ -252,78 +249,31 @@ def write_command( ) -> dict[str, Any]: """A proxy for SocketInfo.write_command that handles event publishing.""" cmd[bwc.field] = docs - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.STARTED, - clientId=client._topology_settings._topology_id, - command=cmd, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) - if bwc.publish: - bwc._start(cmd, request_id, docs) - try: - reply = bwc.conn.write_command(request_id, msg, bwc.codec) # type: ignore[misc] - duration = datetime.datetime.now() - bwc.start_time - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.SUCCEEDED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - reply=reply, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) - if bwc.publish: - bwc._succeed(request_id, reply, duration) # type: ignore[arg-type] - client._process_response(reply, bwc.session) # type: ignore[arg-type] - except Exception as exc: - duration = datetime.datetime.now() - bwc.start_time - if isinstance(exc, (NotPrimaryError, OperationFailure)): - failure: _DocumentOut = exc.details # type: ignore[assignment] - else: - failure = _convert_exception(exc) - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.FAILED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - failure=failure, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - isServerSideError=isinstance(exc, OperationFailure), - ) - - if bwc.publish: - bwc._fail(request_id, failure, duration) - # Process the response from the server. - if isinstance(exc, (NotPrimaryError, OperationFailure)): - client._process_response(exc.details, bwc.session) # type: ignore[arg-type] - raise + with command_telemetry( + command_name=bwc.name, + database_name=bwc.db_name, + spec=cmd, + driver_connection_id=bwc.conn.id, + server_connection_id=bwc.conn.server_connection_id, + publish_event=bwc.publish, + start_time=bwc.start_time, + address=bwc.conn.address, + listeners=bwc.listeners, + client=client, + request_id=request_id, + service_id=bwc.conn.service_id, + operation_id=bwc.op_id, + ) as telemetry: + try: + reply = bwc.conn.write_command(request_id, msg, bwc.codec) # type: ignore[misc] + telemetry.publish_succeeded(reply) # type: ignore[misc, arg-type] + client._process_response(reply, bwc.session) # type: ignore[arg-type] + except Exception as exc: + telemetry.publish_failed(exc) + # Process the response from the server. + if isinstance(exc, (NotPrimaryError, OperationFailure)): + client._process_response(exc.details, bwc.session) # type: ignore[arg-type] + raise return reply # type: ignore[return-value] def unack_write( @@ -337,81 +287,33 @@ def unack_write( client: MongoClient[Any], ) -> Optional[Mapping[str, Any]]: """A proxy for Connection.unack_write that handles event publishing.""" - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.STARTED, - clientId=client._topology_settings._topology_id, - command=cmd, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) - if bwc.publish: - cmd = bwc._start(cmd, request_id, docs) - try: - result = bwc.conn.unack_write(msg, max_doc_size) # type: ignore[func-returns-value, misc, override] - duration = datetime.datetime.now() - bwc.start_time - if result is not None: - reply = _convert_write_result(bwc.name, cmd, result) # type: ignore[arg-type] - else: - # Comply with APM spec. - reply = {"ok": 1} - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.SUCCEEDED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - reply=reply, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) - if bwc.publish: - bwc._succeed(request_id, reply, duration) - except Exception as exc: - duration = datetime.datetime.now() - bwc.start_time - if isinstance(exc, OperationFailure): - failure: _DocumentOut = _convert_write_result(bwc.name, cmd, exc.details) # type: ignore[arg-type] - elif isinstance(exc, NotPrimaryError): - failure = exc.details # type: ignore[assignment] - else: - failure = _convert_exception(exc) - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.FAILED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - failure=failure, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - isServerSideError=isinstance(exc, OperationFailure), - ) - if bwc.publish: - assert bwc.start_time is not None - bwc._fail(request_id, failure, duration) - raise + cmd[bwc.field] = docs + with command_telemetry( + command_name=bwc.name, + database_name=bwc.db_name, + spec=cmd, + driver_connection_id=bwc.conn.id, + server_connection_id=bwc.conn.server_connection_id, + publish_event=bwc.publish, + start_time=bwc.start_time, + address=bwc.conn.address, + listeners=bwc.listeners, + client=client, + request_id=request_id, + service_id=bwc.conn.service_id, + operation_id=bwc.op_id, + ) as telemetry: + try: + result = bwc.conn.unack_write(msg, max_doc_size) # type: ignore[func-returns-value, misc, override] + if result is not None: + reply = _convert_write_result(bwc.name, cmd, result) # type: ignore[arg-type] + else: + # Comply with APM spec. + reply = {"ok": 1} + telemetry.publish_succeeded(reply) # type: ignore[misc, arg-type] + except Exception as exc: + telemetry.publish_failed(exc) + raise return result # type: ignore[return-value] def _execute_batch_unack( diff --git a/pymongo/synchronous/client_bulk.py b/pymongo/synchronous/client_bulk.py index a606d028e1..090a1c1d70 100644 --- a/pymongo/synchronous/client_bulk.py +++ b/pymongo/synchronous/client_bulk.py @@ -19,8 +19,6 @@ from __future__ import annotations import copy -import datetime -import logging from collections.abc import MutableMapping from itertools import islice from typing import ( @@ -62,11 +60,9 @@ WaitQueueTimeoutError, ) from pymongo.helpers_shared import _RETRYABLE_ERROR_CODES -from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log from pymongo.message import ( _ClientBulkWriteContext, _convert_client_bulk_exception, - _convert_exception, _convert_write_result, _randint, ) @@ -77,6 +73,7 @@ InsertOneResult, UpdateResult, ) +from pymongo.telemetry import command_telemetry from pymongo.typings import _DocumentOut, _Pipeline from pymongo.write_concern import WriteConcern @@ -238,82 +235,35 @@ def write_command( """A proxy for Connection.write_command that handles event publishing.""" cmd["ops"] = op_docs cmd["nsInfo"] = ns_docs - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.STARTED, - clientId=client._topology_settings._topology_id, - command=cmd, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) - if bwc.publish: - bwc._start(cmd, request_id, op_docs, ns_docs) - try: - reply = bwc.conn.write_command(request_id, msg, bwc.codec) # type: ignore[misc, arg-type] - duration = datetime.datetime.now() - bwc.start_time - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.SUCCEEDED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - reply=reply, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) - if bwc.publish: - bwc._succeed(request_id, reply, duration) # type: ignore[arg-type] - # Process the response from the server. - self.client._process_response(reply, bwc.session) # type: ignore[arg-type] - except Exception as exc: - duration = datetime.datetime.now() - bwc.start_time - if isinstance(exc, (NotPrimaryError, OperationFailure)): - failure: _DocumentOut = exc.details # type: ignore[assignment] - else: - failure = _convert_exception(exc) - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.FAILED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - failure=failure, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - isServerSideError=isinstance(exc, OperationFailure), - ) - - if bwc.publish: - bwc._fail(request_id, failure, duration) - # Top-level error will be embedded in ClientBulkWriteException. - reply = {"error": exc} - # Process the response from the server. - if isinstance(exc, OperationFailure): - self.client._process_response(exc.details, bwc.session) # type: ignore[arg-type] - else: - self.client._process_response({}, bwc.session) # type: ignore[arg-type] + with command_telemetry( + command_name=bwc.name, + database_name=bwc.db_name, + spec=cmd, + driver_connection_id=bwc.conn.id, + server_connection_id=bwc.conn.server_connection_id, + publish_event=bwc.publish, + start_time=bwc.start_time, + address=bwc.conn.address, + listeners=bwc.listeners, + client=client, + request_id=request_id, + service_id=bwc.conn.service_id, + operation_id=bwc.op_id, + ) as telemetry: + try: + reply = bwc.conn.write_command(request_id, msg, bwc.codec) # type: ignore[misc, arg-type] + telemetry.publish_succeeded(reply) # type: ignore[misc, arg-type] + # Process the response from the server. + self.client._process_response(reply, bwc.session) # type: ignore[arg-type] + except Exception as exc: + telemetry.publish_failed(exc) + # Top-level error will be embedded in ClientBulkWriteException. + reply = {"error": exc} + # Process the response from the server. + if isinstance(exc, OperationFailure): + self.client._process_response(exc.details, bwc.session) # type: ignore[arg-type] + else: + self.client._process_response({}, bwc.session) # type: ignore[arg-type] return reply # type: ignore[return-value] def unack_write( @@ -327,82 +277,35 @@ def unack_write( client: MongoClient[Any], ) -> Optional[Mapping[str, Any]]: """A proxy for Connection.unack_write that handles event publishing.""" - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.STARTED, - clientId=client._topology_settings._topology_id, - command=cmd, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) - if bwc.publish: - cmd = bwc._start(cmd, request_id, op_docs, ns_docs) - try: - result = bwc.conn.unack_write(msg, bwc.max_bson_size) # type: ignore[func-returns-value, misc, override] - duration = datetime.datetime.now() - bwc.start_time - if result is not None: - reply = _convert_write_result(bwc.name, cmd, result) # type: ignore[arg-type] - else: - # Comply with APM spec. - reply = {"ok": 1} - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.SUCCEEDED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - reply=reply, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) - if bwc.publish: - bwc._succeed(request_id, reply, duration) - except Exception as exc: - duration = datetime.datetime.now() - bwc.start_time - if isinstance(exc, OperationFailure): - failure: _DocumentOut = _convert_write_result(bwc.name, cmd, exc.details) # type: ignore[arg-type] - elif isinstance(exc, NotPrimaryError): - failure = exc.details # type: ignore[assignment] - else: - failure = _convert_exception(exc) - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.FAILED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - failure=failure, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - isServerSideError=isinstance(exc, OperationFailure), - ) - if bwc.publish: - assert bwc.start_time is not None - bwc._fail(request_id, failure, duration) - # Top-level error will be embedded in ClientBulkWriteException. - reply = {"error": exc} + cmd["ops"] = op_docs + cmd["nsInfo"] = ns_docs + with command_telemetry( + command_name=bwc.name, + database_name=bwc.db_name, + spec=cmd, + driver_connection_id=bwc.conn.id, + server_connection_id=bwc.conn.server_connection_id, + publish_event=bwc.publish, + start_time=bwc.start_time, + address=bwc.conn.address, + listeners=bwc.listeners, + client=client, + request_id=request_id, + service_id=bwc.conn.service_id, + operation_id=bwc.op_id, + ) as telemetry: + try: + result = bwc.conn.unack_write(msg, bwc.max_bson_size) # type: ignore[func-returns-value, misc, override] + if result is not None: + reply = _convert_write_result(bwc.name, cmd, result) # type: ignore[arg-type] + else: + # Comply with APM spec. + reply = {"ok": 1} + telemetry.publish_succeeded(reply) # type: ignore[misc, arg-type] + except Exception as exc: + telemetry.publish_failed(exc) + # Top-level error will be embedded in ClientBulkWriteException. + reply = {"error": exc} return reply def _execute_batch_unack( diff --git a/pymongo/synchronous/network.py b/pymongo/synchronous/network.py index 7d9bca4d58..ccde5c7c80 100644 --- a/pymongo/synchronous/network.py +++ b/pymongo/synchronous/network.py @@ -16,7 +16,6 @@ from __future__ import annotations import datetime -import logging from typing import ( TYPE_CHECKING, Any, @@ -31,17 +30,13 @@ from bson import _decode_all_selective from pymongo import _csot, helpers_shared, message from pymongo.compression_support import _NO_COMPRESSION -from pymongo.errors import ( - NotPrimaryError, - OperationFailure, -) -from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log from pymongo.message import _OpMsg from pymongo.monitoring import _is_speculative_authenticate from pymongo.network_layer import ( receive_message, sendall, ) +from pymongo.telemetry import command_telemetry if TYPE_CHECKING: from bson import CodecOptions @@ -159,140 +154,62 @@ def command( if max_bson_size is not None and size > max_bson_size + message._COMMAND_OVERHEAD: message._raise_document_too_large(name, size, max_bson_size + message._COMMAND_OVERHEAD) - if client is not None: - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.STARTED, - clientId=client._topology_settings._topology_id, - command=spec, - commandName=next(iter(spec)), - databaseName=dbname, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - ) - if publish: - assert listeners is not None - assert address is not None - listeners.publish_command_start( - orig, - dbname, - request_id, - address, - conn.server_connection_id, - service_id=conn.service_id, - ) - try: - sendall(conn.conn.get_conn, msg) - if use_op_msg and unacknowledged: - # Unacknowledged, fake a successful command response. - reply = None - response_doc: _DocumentOut = {"ok": 1} - else: - reply = receive_message(conn, request_id) - conn.more_to_come = reply.more_to_come - unpacked_docs = reply.unpack_response( - codec_options=codec_options, user_fields=user_fields - ) - - response_doc = unpacked_docs[0] - if not conn.ready: - cluster_time = response_doc.get("$clusterTime") - if cluster_time: - conn._cluster_time = cluster_time - if client: - client._process_response(response_doc, session) - if check: - helpers_shared._check_command_response( - response_doc, - conn.max_wire_version, - allowable_errors, - parse_write_concern_error=parse_write_concern_error, - ) - except Exception as exc: - duration = datetime.datetime.now() - start - if isinstance(exc, (NotPrimaryError, OperationFailure)): - failure: _DocumentOut = exc.details # type: ignore[assignment] - else: - failure = message._convert_exception(exc) - if client is not None: - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.FAILED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - failure=failure, - commandName=next(iter(spec)), - databaseName=dbname, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - isServerSideError=isinstance(exc, OperationFailure), + with command_telemetry( + command_name=name, + database_name=dbname, + spec=spec, + address=address if address else conn.address, + driver_connection_id=conn.id, + server_connection_id=conn.server_connection_id, + publish_event=publish, + start_time=start, + client=client, + listeners=listeners, + request_id=request_id, + service_id=conn.service_id, + ) as telemetry: + try: + sendall(conn.conn.get_conn, msg) + if use_op_msg and unacknowledged: + # Unacknowledged, fake a successful command response. + reply = None + response_doc: _DocumentOut = {"ok": 1} + else: + reply = receive_message(conn, request_id) + conn.more_to_come = reply.more_to_come + unpacked_docs = reply.unpack_response( + codec_options=codec_options, user_fields=user_fields ) - if publish: - assert listeners is not None - assert address is not None - listeners.publish_command_failure( - duration, - failure, - name, - request_id, - address, - conn.server_connection_id, - service_id=conn.service_id, - database_name=dbname, - ) - raise - duration = datetime.datetime.now() - start - if client is not None: - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.SUCCEEDED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - reply=response_doc, - commandName=next(iter(spec)), - databaseName=dbname, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - speculative_authenticate="speculativeAuthenticate" in orig, - ) - if publish: - assert listeners is not None - assert address is not None - listeners.publish_command_success( - duration, - response_doc, - name, - request_id, - address, - conn.server_connection_id, - service_id=conn.service_id, + + response_doc = unpacked_docs[0] + if not conn.ready: + cluster_time = response_doc.get("$clusterTime") + if cluster_time: + conn._cluster_time = cluster_time + if client: + client._process_response(response_doc, session) + if check: + helpers_shared._check_command_response( + response_doc, + conn.max_wire_version, + allowable_errors, + parse_write_concern_error=parse_write_concern_error, + ) + except Exception as exc: + telemetry.publish_failed(exc) + raise + + telemetry.publish_succeeded( + reply=response_doc, speculative_hello=speculative_hello, - database_name=dbname, + speculative_authenticate="speculativeAuthenticate" in orig, ) - if client and client._encrypter and reply: - decrypted = client._encrypter.decrypt(reply.raw_command_response()) - response_doc = cast( - "_DocumentOut", _decode_all_selective(decrypted, codec_options, user_fields)[0] - ) + if client and client._encrypter and reply: + decrypted = client._encrypter.decrypt(reply.raw_command_response()) + response_doc = cast( + "_DocumentOut", _decode_all_selective(decrypted, codec_options, user_fields)[0] + ) - return response_doc # type: ignore[return-value] + return response_doc # type: ignore[return-value] diff --git a/pymongo/synchronous/server.py b/pymongo/synchronous/server.py index f57420918b..6d0dcc612f 100644 --- a/pymongo/synchronous/server.py +++ b/pymongo/synchronous/server.py @@ -27,18 +27,16 @@ ) from bson import _decode_all_selective -from pymongo.errors import NotPrimaryError, OperationFailure from pymongo.helpers_shared import _check_command_response from pymongo.logger import ( - _COMMAND_LOGGER, _SDAM_LOGGER, - _CommandStatusMessage, _debug_log, _SDAMStatusMessage, ) -from pymongo.message import _convert_exception, _GetMore, _OpMsg, _Query +from pymongo.message import _GetMore, _OpMsg, _Query from pymongo.response import PinnedResponse, Response from pymongo.synchronous.helpers import _handle_reauth +from pymongo.telemetry import command_telemetry if TYPE_CHECKING: from queue import Queue @@ -170,140 +168,66 @@ def run_operation( message = operation.get_message(read_preference, conn, use_cmd) request_id, data, max_doc_size = self._split_message(message) - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.STARTED, - clientId=client._topology_settings._topology_id, - command=cmd, - commandName=next(iter(cmd)), - databaseName=dbn, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - ) - - if publish: - if "$db" not in cmd: - cmd["$db"] = dbn - assert listeners is not None - listeners.publish_command_start( - cmd, - dbn, - request_id, - conn.address, - conn.server_connection_id, - service_id=conn.service_id, - ) - - try: - if more_to_come: - reply = conn.receive_message(None) - else: - conn.send_message(data, max_doc_size) - reply = conn.receive_message(request_id) - - # Unpack and check for command errors. - if use_cmd: - user_fields = _CURSOR_DOC_FIELDS - legacy_response = False - else: - user_fields = None - legacy_response = True - docs = unpack_res( - reply, - operation.cursor_id, - operation.codec_options, - legacy_response=legacy_response, - user_fields=user_fields, - ) + if publish and "$db" not in cmd: + cmd["$db"] = dbn + + with command_telemetry( + command_name=operation.name, + database_name=dbn, + spec=cmd, + driver_connection_id=conn.id, + server_connection_id=conn.server_connection_id, + publish_event=publish, + start_time=start, + address=conn.address, + listeners=listeners, + client=client, + request_id=request_id, + service_id=conn.service_id, + ) as telemetry: + try: + if more_to_come: + reply = conn.receive_message(None) + else: + conn.send_message(data, max_doc_size) + reply = conn.receive_message(request_id) + + # Unpack and check for command errors. + if use_cmd: + user_fields = _CURSOR_DOC_FIELDS + legacy_response = False + else: + user_fields = None + legacy_response = True + docs = unpack_res( + reply, + operation.cursor_id, + operation.codec_options, + legacy_response=legacy_response, + user_fields=user_fields, + ) + if use_cmd: + first = docs[0] + operation.client._process_response(first, operation.session) # type: ignore[misc, arg-type] + _check_command_response(first, conn.max_wire_version, pool_opts=conn.opts) # type:ignore[has-type] + except Exception as exc: + telemetry.publish_failed(exc) + raise + + # Must publish in find / getMore / explain command response format. if use_cmd: - first = docs[0] - operation.client._process_response(first, operation.session) # type: ignore[misc, arg-type] - _check_command_response(first, conn.max_wire_version, pool_opts=conn.opts) # type:ignore[has-type] - except Exception as exc: - duration = datetime.now() - start - if isinstance(exc, (NotPrimaryError, OperationFailure)): - failure: _DocumentOut = exc.details # type: ignore[assignment] + res = docs[0] + elif operation.name == "explain": + res = docs[0] if docs else {} else: - failure = _convert_exception(exc) - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.FAILED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - failure=failure, - commandName=next(iter(cmd)), - databaseName=dbn, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - isServerSideError=isinstance(exc, OperationFailure), - ) - if publish: - assert listeners is not None - listeners.publish_command_failure( - duration, - failure, - operation.name, - request_id, - conn.address, - conn.server_connection_id, - service_id=conn.service_id, - database_name=dbn, - ) - raise + res = {"cursor": {"id": reply.cursor_id, "ns": operation.namespace()}, "ok": 1} # type: ignore[union-attr] + if operation.name == "find": + res["cursor"]["firstBatch"] = docs + else: + res["cursor"]["nextBatch"] = docs + telemetry.publish_succeeded(res) + duration = datetime.now() - start - # Must publish in find / getMore / explain command response - # format. - if use_cmd: - res = docs[0] - elif operation.name == "explain": - res = docs[0] if docs else {} - else: - res = {"cursor": {"id": reply.cursor_id, "ns": operation.namespace()}, "ok": 1} # type: ignore[union-attr] - if operation.name == "find": - res["cursor"]["firstBatch"] = docs - else: - res["cursor"]["nextBatch"] = docs - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.SUCCEEDED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - reply=res, - commandName=next(iter(cmd)), - databaseName=dbn, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - ) - if publish: - assert listeners is not None - listeners.publish_command_success( - duration, - res, - operation.name, - request_id, - conn.address, - conn.server_connection_id, - service_id=conn.service_id, - database_name=dbn, - ) # Decrypt response. client = operation.client # type: ignore[assignment] diff --git a/pymongo/telemetry.py b/pymongo/telemetry.py new file mode 100644 index 0000000000..b1308d93c9 --- /dev/null +++ b/pymongo/telemetry.py @@ -0,0 +1,415 @@ +# Copyright 2026-present MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unified telemetry support for PyMongo. + +Supports telemetry through standardized logging, event publishing, and OpenTelemetry. + +To enable OpenTelemetry logging, set the environment variable: + OTEL_PYTHON_INSTRUMENTATION_MONGODB_ENABLED=true + +.. versionadded:: 4.x +""" +from __future__ import annotations + +import logging +import os +from datetime import datetime +from typing import TYPE_CHECKING, Any, Mapping, Optional + +from pymongo import message +from pymongo.errors import NotPrimaryError, OperationFailure +from pymongo.logger import _COMMAND_LOGGER, _SENSITIVE_COMMANDS, _CommandStatusMessage, _debug_log +from pymongo.monitoring import _EventListeners + +try: + from opentelemetry import trace # type: ignore[import-not-found] + from opentelemetry.trace import ( # type: ignore[import-not-found] + Span, + SpanKind, + Status, + StatusCode, + Tracer, + ) + + _HAS_OPENTELEMETRY = True +except ImportError: + _HAS_OPENTELEMETRY = False + trace = None + Span = None + SpanKind = None + Status = None + StatusCode = None + Tracer = None + +if TYPE_CHECKING: + from pymongo.typings import _Address, _AgnosticMongoClient, _DocumentOut + + +_OTEL_ENABLED_ENV = "OTEL_PYTHON_INSTRUMENTATION_MONGODB_ENABLED" + + +def _is_tracing_enabled() -> bool: + if not _HAS_OPENTELEMETRY: + return False + value = os.environ.get(_OTEL_ENABLED_ENV, "").lower() + return value in ("1", "true") + + +def _get_tracer() -> Optional[Tracer]: + if not _HAS_OPENTELEMETRY or not _is_tracing_enabled(): + return None + from pymongo._version import __version__ + + return trace.get_tracer("PyMongo", __version__) + + +def _is_sensitive_command(command_name: str) -> bool: + return command_name.lower() in _SENSITIVE_COMMANDS + + +def _build_query_summary( + command_name: str, + database_name: str, + collection_name: Optional[str], +) -> str: + """Build the db.query.summary attribute value.""" + if collection_name: + return f"{command_name} {database_name}.{collection_name}" + return f"{command_name} {database_name}" + + +def _extract_collection_name(spec: Mapping[str, Any]) -> Optional[str]: + """Extract collection name from command spec if applicable.""" + if not spec: + return None + cmd_name = next(iter(spec)).lower() + # Commands where the first value is the collection name + if cmd_name in ( + "insert", + "update", + "delete", + "find", + "aggregate", + "findandmodify", + "count", + "distinct", + "create", + "drop", + "createindexes", + "dropindexes", + "listindexes", + ): + value = spec.get(next(iter(spec))) + if isinstance(value, str): + return value + return None + + +class _CommandTelemetry: + """Manages telemetry for MongoDB commands, including logging, event publishing, and OpenTelemetry spans. + + This class is a context manager that handles the full lifecycle of command telemetry: + - On entry: sets up OpenTelemetry span (if enabled) and publishes the started event and/or log + - On exit: cleans up the span context (caller handles success/failure publishing) + """ + + __slots__ = ( + "_command_name", + "_database_name", + "_spec", + "_driver_connection_id", + "_server_connection_id", + "_publish_event", + "_start_time", + "_address", + "_listeners", + "_client", + "_request_id", + "_operation_id", + "_service_id", + "_span", + "_span_context", + ) + + def __init__( + self, + command_name: str, + database_name: str, + spec: Mapping[str, Any], + driver_connection_id: int, + server_connection_id: Optional[int], + publish_event: bool, + start_time: datetime, + address: _Address, + listeners: Optional[_EventListeners], + client: Optional[_AgnosticMongoClient], + request_id: int, + service_id: Optional[Any], + operation_id: Optional[int] = None, + ): + self._command_name = command_name + self._database_name = database_name + self._spec = spec + self._driver_connection_id = driver_connection_id + self._server_connection_id = server_connection_id + self._publish_event = publish_event + self._start_time = start_time + self._address = address + self._listeners = listeners + self._client = client + self._request_id = request_id + self._operation_id = operation_id if operation_id is not None else request_id + self._service_id = service_id + self._span: Optional[Span] = None + self._span_context: Optional[Any] = None + + def __enter__(self) -> _CommandTelemetry: + self._setup_span() + self.publish_started() + return self + + def __exit__( + self, + exc_type: Optional[type], + exc_val: Optional[BaseException], + exc_tb: Optional[Any], + ) -> None: + if self._span_context is not None: + self._span_context.__exit__(exc_type, exc_val, exc_tb) + + def _setup_span(self) -> None: + """Set up OpenTelemetry span if tracing is enabled and command is not sensitive.""" + tracer = _get_tracer() + + if tracer is None or _is_sensitive_command(self._command_name): + return + + collection_name = _extract_collection_name(self._spec) + query_summary = _build_query_summary( + self._command_name, self._database_name, collection_name + ) + + self._span_context = tracer.start_as_current_span( + name=self._command_name, + kind=SpanKind.CLIENT, + ) + self._span = self._span_context.__enter__() + + self._span.set_attribute("db.system", "mongodb") + self._span.set_attribute("db.namespace", self._database_name) + self._span.set_attribute("db.command.name", self._command_name) + self._span.set_attribute("db.query.summary", query_summary) + if self._address: + self._span.set_attribute("server.address", self._address[0]) + self._span.set_attribute("server.port", self._address[1]) + self._span.set_attribute("network.transport", "tcp") + self._span.set_attribute("db.mongodb.driver_connection_id", self._driver_connection_id) + + if collection_name: + self._span.set_attribute("db.collection.name", collection_name) + if self._server_connection_id is not None: + self._span.set_attribute("db.mongodb.server_connection_id", self._server_connection_id) + + @property + def span(self) -> Optional[Span]: + """Return the OpenTelemetry span, or None if tracing is disabled.""" + return self._span + + def publish_started(self) -> None: + """Publish command started event and log if enabled.""" + if self._client is not None: + if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _COMMAND_LOGGER, + message=_CommandStatusMessage.STARTED, + clientId=self._client._topology_settings._topology_id, + command=self._spec, + commandName=next(iter(self._spec)), + databaseName=self._database_name, + requestId=self._request_id, + operationId=self._operation_id, + driverConnectionId=self._driver_connection_id, + serverConnectionId=self._server_connection_id, + serverHost=self._address[0] if self._address else None, + serverPort=self._address[1] if self._address else None, + serviceId=self._service_id, + ) + if self._publish_event: + assert self._listeners is not None + assert self._address is not None + self._listeners.publish_command_start( + self._spec, # type: ignore[arg-type] + self._database_name, + self._request_id, + self._address, + self._server_connection_id, + op_id=self._operation_id, + service_id=self._service_id, + ) + + def publish_succeeded( + self, + reply: _DocumentOut, + speculative_hello: bool = False, + speculative_authenticate: bool = False, + ) -> None: + """Publish command succeeded event and log if enabled.""" + duration = datetime.now() - self._start_time + + # Add cursor_id to span if present in response + if self._span is not None and isinstance(reply, dict): + cursor_info = reply.get("cursor") + if cursor_info and isinstance(cursor_info, dict): + cursor_id = cursor_info.get("id", 0) + if cursor_id: + self._span.set_attribute("db.mongodb.cursor_id", cursor_id) + + if self._client is not None: + if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _COMMAND_LOGGER, + message=_CommandStatusMessage.SUCCEEDED, + clientId=self._client._topology_settings._topology_id, + durationMS=duration, + reply=reply, + commandName=next(iter(self._spec)), + databaseName=self._database_name, + requestId=self._request_id, + operationId=self._operation_id, + driverConnectionId=self._driver_connection_id, + serverConnectionId=self._server_connection_id, + serverHost=self._address[0] if self._address else None, + serverPort=self._address[1] if self._address else None, + serviceId=self._service_id, + speculative_authenticate=speculative_authenticate, + ) + if self._publish_event: + assert self._listeners is not None + assert self._address is not None + self._listeners.publish_command_success( + duration, + reply, + self._command_name, + self._request_id, + self._address, + self._server_connection_id, + op_id=self._operation_id, + service_id=self._service_id, + speculative_hello=speculative_hello, + database_name=self._database_name, + ) + + def publish_failed(self, exc: Exception) -> None: + """Publish command failed event and log if enabled.""" + duration = datetime.now() - self._start_time + if isinstance(exc, (NotPrimaryError, OperationFailure)): + failure: _DocumentOut = exc.details # type: ignore[assignment] + else: + failure = message._convert_exception(exc) + + if self._span is not None: + error_code = getattr(exc, "code", None) + self._span.record_exception(exc) + self._span.set_status(Status(StatusCode.ERROR, str(exc))) + + if error_code is not None: + self._span.set_attribute("db.response.status_code", str(error_code)) + if self._client is not None: + if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _COMMAND_LOGGER, + message=_CommandStatusMessage.FAILED, + clientId=self._client._topology_settings._topology_id, + durationMS=duration, + failure=failure, + commandName=next(iter(self._spec)), + databaseName=self._database_name, + requestId=self._request_id, + operationId=self._operation_id, + driverConnectionId=self._driver_connection_id, + serverConnectionId=self._server_connection_id, + serverHost=self._address[0] if self._address else None, + serverPort=self._address[1] if self._address else None, + serviceId=self._service_id, + isServerSideError=isinstance(exc, OperationFailure), + ) + if self._publish_event: + assert self._listeners is not None + assert self._address is not None + self._listeners.publish_command_failure( + duration, + failure, + self._command_name, + self._request_id, + self._address, + self._server_connection_id, + op_id=self._operation_id, + service_id=self._service_id, + database_name=self._database_name, + ) + + +def command_telemetry( + command_name: str, + database_name: str, + spec: Mapping[str, Any], + driver_connection_id: int, + server_connection_id: Optional[int], + publish_event: bool, + start_time: datetime, + request_id: int, + address: _Address, + listeners: Optional[_EventListeners] = None, + client: Optional[_AgnosticMongoClient] = None, + service_id: Optional[Any] = None, + operation_id: Optional[int] = None, +) -> _CommandTelemetry: + """Create a _CommandTelemetry context manager for command telemetry. + + Returns a _CommandTelemetry instance that should be used as a context manager. + The context manager automatically: + - Sets up OpenTelemetry span if tracing is enabled and command is not sensitive + - Publishes the started event and/or log on entry if enabled + - Cleans up the span context on exit + + The caller is responsible for calling publish_succeeded() on successful completion + and publish_failed() if an exception occurs. + + Example usage:: + + with command_telemetry(...) as telemetry: + try: + # execute command + result = execute_command() + except Exception as exc: + telemetry.publish_failed(exc) + raise + telemetry.publish_succeeded(result) + """ + return _CommandTelemetry( + command_name=command_name, + database_name=database_name, + spec=spec, + driver_connection_id=driver_connection_id, + server_connection_id=server_connection_id, + publish_event=publish_event, + start_time=start_time, + address=address, + listeners=listeners, + client=client, + request_id=request_id, + service_id=service_id, + operation_id=operation_id, + ) diff --git a/pyproject.toml b/pyproject.toml index 9b3287834a..fab9719b15 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -83,6 +83,7 @@ docs = ["requirements/docs.txt"] encryption = ["requirements/encryption.txt"] gssapi = ["requirements/gssapi.txt"] ocsp = ["requirements/ocsp.txt"] +opentelemetry = ["requirements/opentelemetry.txt"] snappy = ["requirements/snappy.txt"] test = ["requirements/test.txt"] zstd = ["requirements/zstd.txt"] diff --git a/requirements/opentelemetry.txt b/requirements/opentelemetry.txt new file mode 100644 index 0000000000..d20388d07f --- /dev/null +++ b/requirements/opentelemetry.txt @@ -0,0 +1 @@ +opentelemetry-api>=1.20.0 diff --git a/uv.lock b/uv.lock index 78d0cc213f..c8eb154730 100644 --- a/uv.lock +++ b/uv.lock @@ -1424,6 +1424,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/d2/1d/1b658dbd2b9fa9c4c9f32accbfc0205d532c8c6194dc0f2a4c0428e7128a/nodeenv-1.9.1-py2.py3-none-any.whl", hash = "sha256:ba11c9782d29c27c70ffbdda2d7415098754709be8a7056d79a737cd901155c9", size = 22314, upload-time = "2024-06-04T18:44:08.352Z" }, ] +[[package]] +name = "opentelemetry-api" +version = "1.39.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "importlib-metadata" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/97/b9/3161be15bb8e3ad01be8be5a968a9237c3027c5be504362ff800fca3e442/opentelemetry_api-1.39.1.tar.gz", hash = "sha256:fbde8c80e1b937a2c61f20347e91c0c18a1940cecf012d62e65a7caf08967c9c", size = 65767, upload-time = "2025-12-11T13:32:39.182Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/cf/df/d3f1ddf4bb4cb50ed9b1139cc7b1c54c34a1e7ce8fd1b9a37c0d1551a6bd/opentelemetry_api-1.39.1-py3-none-any.whl", hash = "sha256:2edd8463432a7f8443edce90972169b195e7d6a05500cd29e6d13898187c9950", size = 66356, upload-time = "2025-12-11T13:32:17.304Z" }, +] + [[package]] name = "packaging" version = "25.0" @@ -1545,6 +1558,9 @@ ocsp = [ { name = "requests" }, { name = "service-identity" }, ] +opentelemetry = [ + { name = "opentelemetry-api" }, +] snappy = [ { name = "python-snappy" }, ] @@ -1591,6 +1607,7 @@ requires-dist = [ { name = "dnspython", specifier = ">=2.6.1,<3.0.0" }, { name = "furo", marker = "extra == 'docs'", specifier = "==2025.12.19" }, { name = "importlib-metadata", marker = "python_full_version < '3.13' and extra == 'test'", specifier = ">=7.0" }, + { name = "opentelemetry-api", marker = "extra == 'opentelemetry'", specifier = ">=1.20.0" }, { name = "pykerberos", marker = "os_name != 'nt' and extra == 'gssapi'", specifier = ">=1.2.4" }, { name = "pymongo-auth-aws", marker = "extra == 'aws'", specifier = ">=1.1.0,<2.0.0" }, { name = "pymongo-auth-aws", marker = "extra == 'encryption'", specifier = ">=1.1.0,<2.0.0" }, @@ -1608,7 +1625,7 @@ requires-dist = [ { name = "sphinxcontrib-shellcheck", marker = "extra == 'docs'", specifier = ">=1,<2" }, { name = "winkerberos", marker = "os_name == 'nt' and extra == 'gssapi'", specifier = ">=0.5.0" }, ] -provides-extras = ["aws", "docs", "encryption", "gssapi", "ocsp", "snappy", "test", "zstd"] +provides-extras = ["aws", "docs", "encryption", "gssapi", "ocsp", "opentelemetry", "snappy", "test", "zstd"] [package.metadata.requires-dev] coverage = [{ name = "coverage", extras = ["toml"], specifier = ">=5,<=7.10.7" }]