diff --git a/test/asynchronous/test_network_layer.py b/test/asynchronous/test_network_layer.py new file mode 100644 index 0000000000..fa207be49f --- /dev/null +++ b/test/asynchronous/test_network_layer.py @@ -0,0 +1,500 @@ +# Copyright 2026-present MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for network_layer.py.""" + +from __future__ import annotations + +import asyncio +import struct +import sys +from unittest.mock import AsyncMock, MagicMock, patch + +sys.path[0:0] = [""] + +from test.asynchronous import AsyncUnitTest, unittest + +from pymongo.common import MAX_MESSAGE_SIZE +from pymongo.errors import ProtocolError +from pymongo.network_layer import ( + AsyncNetworkingInterface, + NetworkingInterface, + NetworkingInterfaceBase, + PyMongoProtocol, + _async_socket_receive, + sendall, +) + +_IS_SYNC = False + + +async def _make_protocol(timeout=None): + proto = PyMongoProtocol(timeout=timeout) + mock_transport = MagicMock() + mock_transport.is_closing.return_value = False + proto.transport = mock_transport + return proto + + +def _make_header(length, request_id, response_to, op_code): + return struct.pack(" 16) triggers ProtocolError + hdr = _make_header(16, 1, 0, 2013) + proto._header = memoryview(bytearray(hdr)) + proto._header_index = 0 + proto.buffer_updated(16) + self.assertTrue(proto._connection_lost) + + async def test_compression_header_processing(self): + proto = await _make_protocol() + proto._expecting_header = False + proto._expecting_compression = True + comp_hdr = struct.pack(" 16) triggers ProtocolError + hdr = _make_header(16, 1, 0, 2013) + proto._header = memoryview(bytearray(hdr)) + proto._header_index = 0 + proto.buffer_updated(16) + self.assertTrue(proto._connection_lost) + + def test_compression_header_processing(self): + proto = _make_protocol() + proto._expecting_header = False + proto._expecting_compression = True + comp_hdr = struct.pack(" bool: "test_monitor.py", "test_monitoring.py", "test_mongos_load_balancing.py", + "test_network_layer.py", "test_on_demand_csfle.py", "test_pooling.py", "test_raw_bson.py",