From 37f64dc0fb67bb2544ce4aca8943d755fcfd27bc Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Thu, 7 May 2026 16:20:35 -0400 Subject: [PATCH 1/7] PYTHON-5631 - test_direct_client_maintains_pool_to_arbiter waits instead of asserting --- test/asynchronous/test_client.py | 2 +- test/test_client.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/test/asynchronous/test_client.py b/test/asynchronous/test_client.py index ca150ca6df..9023112b53 100644 --- a/test/asynchronous/test_client.py +++ b/test/asynchronous/test_client.py @@ -2715,7 +2715,7 @@ async def test_direct_client_maintains_pool_to_arbiter(self): await listener.async_wait_for_event(monitoring.ConnectionReadyEvent, 1) self.assertEqual(listener.event_count(monitoring.ConnectionCreatedEvent), 1) arbiter = c._topology.get_server_by_address(("c", 3)) - self.assertEqual(len(arbiter.pool.conns), 1) + await async_wait_until(lambda: len(arbiter.pool.conns) == 1, "create 1 pooled connection") # Arbiter pool is marked ready. self.assertEqual(listener.event_count(monitoring.PoolReadyEvent), 1) diff --git a/test/test_client.py b/test/test_client.py index 75d585fdad..fb400b6986 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -2670,7 +2670,7 @@ def test_direct_client_maintains_pool_to_arbiter(self): listener.wait_for_event(monitoring.ConnectionReadyEvent, 1) self.assertEqual(listener.event_count(monitoring.ConnectionCreatedEvent), 1) arbiter = c._topology.get_server_by_address(("c", 3)) - self.assertEqual(len(arbiter.pool.conns), 1) + wait_until(lambda: len(arbiter.pool.conns) == 1, "create 1 pooled connection") # Arbiter pool is marked ready. self.assertEqual(listener.event_count(monitoring.PoolReadyEvent), 1) From 32e391893b1f659205574f1d6352857015b4498c Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Mon, 11 May 2026 11:39:04 -0400 Subject: [PATCH 2/7] PYTHON-3923 - Handle socket closures in tests to avoid ResourceWarning --- pymongo/asynchronous/pool.py | 57 ++++++++++++---------- pymongo/pool_shared.py | 89 ++++++++++++++++++++--------------- pymongo/synchronous/pool.py | 57 ++++++++++++---------- pyproject.toml | 5 -- test/__init__.py | 2 + test/asynchronous/__init__.py | 2 + 6 files changed, 122 insertions(+), 90 deletions(-) diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index a5d5b28990..6f9aa3492d 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -1065,34 +1065,43 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A raise conn = AsyncConnection(networking_interface, self, self.address, conn_id, self.is_sdam) # type: ignore[arg-type] - async with self.lock: - self.active_contexts.add(conn.cancel_context) - self.active_contexts.discard(tmp_context) - if tmp_context.cancelled: - conn.cancel_context.cancel() - completed_hello = False try: - if not self.is_sdam: - await conn.hello() - completed_hello = True - self.is_writable = conn.is_writable - if handler: - handler.contribute_socket(conn, completed_handshake=False) - - await conn.authenticate() - # Catch KeyboardInterrupt, CancelledError, etc. and cleanup. - except BaseException as e: async with self.lock: - self.active_contexts.discard(conn.cancel_context) - if not completed_hello: - self._handle_connection_error(e) - await conn.close_conn(ConnectionClosedReason.ERROR) - raise + self.active_contexts.add(conn.cancel_context) + self.active_contexts.discard(tmp_context) + if tmp_context.cancelled: + conn.cancel_context.cancel() + completed_hello = False + try: + if not self.is_sdam: + await conn.hello() + completed_hello = True + self.is_writable = conn.is_writable + if handler: + handler.contribute_socket(conn, completed_handshake=False) + + await conn.authenticate() + # Catch KeyboardInterrupt, CancelledError, etc. and cleanup. + except BaseException as e: + async with self.lock: + self.active_contexts.discard(conn.cancel_context) + if not completed_hello: + self._handle_connection_error(e) + await conn.close_conn(ConnectionClosedReason.ERROR) + raise - if handler: - await handler.client._topology.receive_cluster_time(conn._cluster_time) + if handler: + await handler.client._topology.receive_cluster_time(conn._cluster_time) - return conn + return conn + # Catch cancellations that interrupt outside the inner try block above + except BaseException: + if not conn.closed: + try: + await conn.close_conn(ConnectionClosedReason.ERROR) + except BaseException: # noqa: S110 + pass + raise @contextlib.asynccontextmanager async def checkout( diff --git a/pymongo/pool_shared.py b/pymongo/pool_shared.py index a6f434885b..5de8d35455 100644 --- a/pymongo/pool_shared.py +++ b/pymongo/pool_shared.py @@ -207,6 +207,7 @@ async def _async_create_connection(address: _Address, options: PoolOptions) -> s sock = socket.socket(af, socktype, proto) # Fallback when SOCK_CLOEXEC isn't available. _set_non_inheritable_non_atomic(sock.fileno()) + sock_returned = False try: sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # CSOT: apply timeout to socket connect. @@ -223,14 +224,17 @@ async def _async_create_connection(address: _Address, options: PoolOptions) -> s asyncio.get_running_loop().sock_connect(sock, sa), timeout=timeout ) sock.settimeout(timeout) + sock_returned = True return sock except asyncio.TimeoutError as e: - sock.close() err = socket.timeout("timed out") err.__cause__ = e except OSError as e: - sock.close() err = e # type: ignore[assignment] + finally: + # Always close the socket if it wasn't returned to avoid leaks. + if not sock_returned: + sock.close() if err is not None: raise err @@ -307,48 +311,59 @@ async def _configured_protocol_interface( Sets protocol's SSL and timeout options. """ sock = await _async_create_connection(address, options) - ssl_context = options._ssl_context - timeout = options.socket_timeout + sock_adopted = False + try: + ssl_context = options._ssl_context + timeout = options.socket_timeout - if ssl_context is None: - return AsyncNetworkingInterface( - await asyncio.get_running_loop().create_connection( + if ssl_context is None: + result = await asyncio.get_running_loop().create_connection( lambda: PyMongoProtocol(timeout=timeout), sock=sock ) - ) + sock_adopted = True + return AsyncNetworkingInterface(result) - host = address[0] - try: - # We have to pass hostname / ip address to wrap_socket - # to use SSLContext.check_hostname. - transport, protocol = await asyncio.get_running_loop().create_connection( # type: ignore[call-overload] - lambda: PyMongoProtocol(timeout=timeout), - sock=sock, - server_hostname=host, - ssl=ssl_context, - ) - except _CertificateError: - # Raise _CertificateError directly like we do after match_hostname - # below. - raise - except (OSError, *SSLErrors) as exc: - # We raise AutoReconnect for transient and permanent SSL handshake - # failures alike. Permanent handshake failures, like protocol - # mismatch, will be turned into ServerSelectionTimeoutErrors later. - details = _get_timeout_details(options) - _raise_connection_failure(address, exc, "SSL handshake failed: ", timeout_details=details) - if ( - ssl_context.verify_mode - and not ssl_context.check_hostname - and not options.tls_allow_invalid_hostnames - ): + host = address[0] try: - ssl.match_hostname(transport.get_extra_info("peercert"), hostname=host) # type:ignore[attr-defined,unused-ignore] + # We have to pass hostname / ip address to wrap_socket + # to use SSLContext.check_hostname. + transport, protocol = await asyncio.get_running_loop().create_connection( # type: ignore[call-overload] + lambda: PyMongoProtocol(timeout=timeout), + sock=sock, + server_hostname=host, + ssl=ssl_context, + ) + sock_adopted = True except _CertificateError: - transport.abort() + # Raise _CertificateError directly like we do after match_hostname + # below. raise - - return AsyncNetworkingInterface((transport, protocol)) + except (OSError, *SSLErrors) as exc: + # We raise AutoReconnect for transient and permanent SSL handshake + # failures alike. Permanent handshake failures, like protocol + # mismatch, will be turned into ServerSelectionTimeoutErrors later. + details = _get_timeout_details(options) + _raise_connection_failure( + address, exc, "SSL handshake failed: ", timeout_details=details + ) + if ( + ssl_context.verify_mode + and not ssl_context.check_hostname + and not options.tls_allow_invalid_hostnames + ): + try: + ssl.match_hostname(transport.get_extra_info("peercert"), hostname=host) # type:ignore[attr-defined,unused-ignore] + except _CertificateError: + transport.abort() + raise + + return AsyncNetworkingInterface((transport, protocol)) + finally: + # If cancellation or any exception lands between sock creation and + # transport adoption, asyncio.create_connection has not registered + # cleanup for the sock — close it ourselves so it doesn't leak. + if not sock_adopted: + sock.close() def _create_connection(address: _Address, options: PoolOptions) -> socket.socket: diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index 25f2d08fe7..9fd0011b1f 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -1061,34 +1061,43 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect raise conn = Connection(networking_interface, self, self.address, conn_id, self.is_sdam) # type: ignore[arg-type] - with self.lock: - self.active_contexts.add(conn.cancel_context) - self.active_contexts.discard(tmp_context) - if tmp_context.cancelled: - conn.cancel_context.cancel() - completed_hello = False try: - if not self.is_sdam: - conn.hello() - completed_hello = True - self.is_writable = conn.is_writable - if handler: - handler.contribute_socket(conn, completed_handshake=False) - - conn.authenticate() - # Catch KeyboardInterrupt, CancelledError, etc. and cleanup. - except BaseException as e: with self.lock: - self.active_contexts.discard(conn.cancel_context) - if not completed_hello: - self._handle_connection_error(e) - conn.close_conn(ConnectionClosedReason.ERROR) - raise + self.active_contexts.add(conn.cancel_context) + self.active_contexts.discard(tmp_context) + if tmp_context.cancelled: + conn.cancel_context.cancel() + completed_hello = False + try: + if not self.is_sdam: + conn.hello() + completed_hello = True + self.is_writable = conn.is_writable + if handler: + handler.contribute_socket(conn, completed_handshake=False) + + conn.authenticate() + # Catch KeyboardInterrupt, CancelledError, etc. and cleanup. + except BaseException as e: + with self.lock: + self.active_contexts.discard(conn.cancel_context) + if not completed_hello: + self._handle_connection_error(e) + conn.close_conn(ConnectionClosedReason.ERROR) + raise - if handler: - handler.client._topology.receive_cluster_time(conn._cluster_time) + if handler: + handler.client._topology.receive_cluster_time(conn._cluster_time) - return conn + return conn + # Catch cancellations that interrupt outside the inner try block above + except BaseException: + if not conn.closed: + try: + conn.close_conn(ConnectionClosedReason.ERROR) + except BaseException: # noqa: S110 + pass + raise @contextlib.contextmanager def checkout( diff --git a/pyproject.toml b/pyproject.toml index 9b3287834a..dd8a4955d2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -108,11 +108,6 @@ filterwarnings = [ # pytest-asyncio known issue: https://github.com/pytest-dev/pytest-asyncio/issues/1032 "module:.*WindowsSelectorEventLoopPolicy:DeprecationWarning", "module:.*et_event_loop_policy:DeprecationWarning", - # TODO: Remove as part of PYTHON-3923. - "module:unclosed Date: Mon, 18 May 2026 10:50:15 -0400 Subject: [PATCH 3/7] Clarify comment --- pymongo/pool_shared.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pymongo/pool_shared.py b/pymongo/pool_shared.py index 5de8d35455..ebb84c3f87 100644 --- a/pymongo/pool_shared.py +++ b/pymongo/pool_shared.py @@ -224,6 +224,7 @@ async def _async_create_connection(address: _Address, options: PoolOptions) -> s asyncio.get_running_loop().sock_connect(sock, sa), timeout=timeout ) sock.settimeout(timeout) + # Set immediately before return. Do not insert an await between this and the return sock_returned = True return sock except asyncio.TimeoutError as e: From 20c5c6bfa972d34ece5a8270f4980db9c293482f Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Mon, 18 May 2026 11:38:22 -0400 Subject: [PATCH 4/7] Comment clarity --- pymongo/pool_shared.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pymongo/pool_shared.py b/pymongo/pool_shared.py index ebb84c3f87..1ad84478da 100644 --- a/pymongo/pool_shared.py +++ b/pymongo/pool_shared.py @@ -360,9 +360,10 @@ async def _configured_protocol_interface( return AsyncNetworkingInterface((transport, protocol)) finally: - # If cancellation or any exception lands between sock creation and + # If cancellation or any exception lands between socket creation and # transport adoption, asyncio.create_connection has not registered - # cleanup for the sock — close it ourselves so it doesn't leak. + # cleanup for the sock. + # Close it ourselves to prevent leaks. if not sock_adopted: sock.close() From eceb69faa78938c7fad651c1919f0f7a53cd3ab4 Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Mon, 18 May 2026 13:29:21 -0400 Subject: [PATCH 5/7] Fix pooling test leak --- test/asynchronous/test_pooling.py | 1 + test/test_pooling.py | 1 + 2 files changed, 2 insertions(+) diff --git a/test/asynchronous/test_pooling.py b/test/asynchronous/test_pooling.py index 9db9b5ab3a..6ae84be6e6 100644 --- a/test/asynchronous/test_pooling.py +++ b/test/asynchronous/test_pooling.py @@ -172,6 +172,7 @@ async def create_pool(self, pair=None, *args, **kwargs): kwargs["server_api"] = pool_options.server_api pool = Pool(pair, PoolOptions(*args, **kwargs)) await pool.ready() + self.addAsyncCleanup(pool.close) return pool diff --git a/test/test_pooling.py b/test/test_pooling.py index 95558d00d5..ed5fa40020 100644 --- a/test/test_pooling.py +++ b/test/test_pooling.py @@ -172,6 +172,7 @@ def create_pool(self, pair=None, *args, **kwargs): kwargs["server_api"] = pool_options.server_api pool = Pool(pair, PoolOptions(*args, **kwargs)) pool.ready() + self.addCleanup(pool.close) return pool From b01931495a240f7e86b4f66ff53aef59b08b701b Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Mon, 18 May 2026 14:17:33 -0400 Subject: [PATCH 6/7] Fix asyncio transport socket leak --- pymongo/pool_shared.py | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/pymongo/pool_shared.py b/pymongo/pool_shared.py index 1ad84478da..2e4522ea3f 100644 --- a/pymongo/pool_shared.py +++ b/pymongo/pool_shared.py @@ -312,15 +312,14 @@ async def _configured_protocol_interface( Sets protocol's SSL and timeout options. """ sock = await _async_create_connection(address, options) + ssl_context = options._ssl_context + timeout = options.socket_timeout + # Create the Protocol early to prevent asyncio resource leaks during cleanup path + protocol = PyMongoProtocol(timeout=timeout) sock_adopted = False try: - ssl_context = options._ssl_context - timeout = options.socket_timeout - if ssl_context is None: - result = await asyncio.get_running_loop().create_connection( - lambda: PyMongoProtocol(timeout=timeout), sock=sock - ) + result = await asyncio.get_running_loop().create_connection(lambda: protocol, sock=sock) sock_adopted = True return AsyncNetworkingInterface(result) @@ -328,8 +327,8 @@ async def _configured_protocol_interface( try: # We have to pass hostname / ip address to wrap_socket # to use SSLContext.check_hostname. - transport, protocol = await asyncio.get_running_loop().create_connection( # type: ignore[call-overload] - lambda: PyMongoProtocol(timeout=timeout), + transport, _ = await asyncio.get_running_loop().create_connection( # type: ignore[call-overload] + lambda: protocol, sock=sock, server_hostname=host, ssl=ssl_context, @@ -360,12 +359,13 @@ async def _configured_protocol_interface( return AsyncNetworkingInterface((transport, protocol)) finally: - # If cancellation or any exception lands between socket creation and - # transport adoption, asyncio.create_connection has not registered - # cleanup for the sock. - # Close it ourselves to prevent leaks. if not sock_adopted: - sock.close() + # If the protocol owns the transport, it also adopted the socket and needs to be cleaned up from the transport + if protocol.transport is not None: + protocol.transport.abort() + # Otherwise the socket was never adopted, close it directly + else: + sock.close() def _create_connection(address: _Address, options: PoolOptions) -> socket.socket: From 29ba515060edce015bef5f5f50e3d4aeaae9c2e6 Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Tue, 19 May 2026 12:56:39 -0400 Subject: [PATCH 7/7] Shield async connection closing --- pymongo/asynchronous/pool.py | 18 ++++++++++++------ pymongo/synchronous/pool.py | 18 ++++++++++++------ 2 files changed, 24 insertions(+), 12 deletions(-) diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index 6f9aa3492d..89157e9543 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -853,9 +853,12 @@ async def _reset( # publishing the PoolClearedEvent. if close: if not _IS_SYNC: - await asyncio.gather( - *[conn.close_conn(ConnectionClosedReason.POOL_CLOSED) for conn in sockets], # type: ignore[func-returns-value] - return_exceptions=True, + # Shield the closing of connections to avoid leaks + await asyncio.shield( + asyncio.gather( + *[conn.close_conn(ConnectionClosedReason.POOL_CLOSED) for conn in sockets], # type: ignore[func-returns-value] + return_exceptions=True, + ) ) else: for conn in sockets: @@ -890,9 +893,12 @@ async def _reset( interrupt_connections=interrupt_connections, ) if not _IS_SYNC: - await asyncio.gather( - *[conn.close_conn(ConnectionClosedReason.STALE) for conn in sockets], # type: ignore[func-returns-value] - return_exceptions=True, + # Shield the closing of connections to avoid leaks + await asyncio.shield( + asyncio.gather( + *[conn.close_conn(ConnectionClosedReason.STALE) for conn in sockets], # type: ignore[func-returns-value] + return_exceptions=True, + ) ) else: for conn in sockets: diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index 9fd0011b1f..a3790cd9c5 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -851,9 +851,12 @@ def _reset( # publishing the PoolClearedEvent. if close: if not _IS_SYNC: - asyncio.gather( - *[conn.close_conn(ConnectionClosedReason.POOL_CLOSED) for conn in sockets], # type: ignore[func-returns-value] - return_exceptions=True, + # Shield the closing of connections to avoid leaks + asyncio.shield( + asyncio.gather( + *[conn.close_conn(ConnectionClosedReason.POOL_CLOSED) for conn in sockets], # type: ignore[func-returns-value] + return_exceptions=True, + ) ) else: for conn in sockets: @@ -888,9 +891,12 @@ def _reset( interrupt_connections=interrupt_connections, ) if not _IS_SYNC: - asyncio.gather( - *[conn.close_conn(ConnectionClosedReason.STALE) for conn in sockets], # type: ignore[func-returns-value] - return_exceptions=True, + # Shield the closing of connections to avoid leaks + asyncio.shield( + asyncio.gather( + *[conn.close_conn(ConnectionClosedReason.STALE) for conn in sockets], # type: ignore[func-returns-value] + return_exceptions=True, + ) ) else: for conn in sockets: