Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion ld_eventsource/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class _HttpClientImpl:
def __init__(self, params: _HttpConnectParams, logger: Logger):
self.__params = params
self.__pool = params.pool or PoolManager()
self.__should_close_pool = params.pool is not None
self.__should_close_pool = params.pool is None
self.__logger = logger

def connect(self, last_event_id: Optional[str]) -> Tuple[Iterator[bytes], Callable, Dict[str, Any]]:
Expand Down Expand Up @@ -125,4 +125,14 @@ def close():

def close(self):
if self.__should_close_pool:
# Close pooled connections (sends the TCP FIN) before dropping the pool.
# PoolManager.clear() alone drops the pool dict without closing the
# underlying sockets, leaving the connection open until garbage collection.
for key in list(self.__pool.pools.keys()):
connection_pool = self.__pool.pools.get(key)
if connection_pool is not None:
try:
connection_pool.close()
except Exception:
self.__logger.debug("Error closing connection pool", exc_info=True)
self.__pool.clear()
33 changes: 33 additions & 0 deletions ld_eventsource/testing/test_http_connect_strategy.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import logging
from unittest import mock

from urllib3 import PoolManager
from urllib3.exceptions import ProtocolError

from ld_eventsource import *
Expand Down Expand Up @@ -239,3 +241,34 @@ def test_fault_exposes_headers_from_http_error():
assert fault.headers is not None
assert fault.headers.get('Retry-After') == '60'
assert fault.headers.get('X-Error-Code') == 'SERVICE_UNAVAILABLE'


def test_close_leaves_caller_supplied_pool_open():
# The caller owns the lifecycle of a pool it supplies, so close() must not
# tear it down. (Regression: this ownership flag was inverted, causing the
# client to clear() a caller-supplied pool out from under the caller.)
pool = mock.MagicMock(spec=PoolManager)
client = ConnectStrategy.http("http://test", pool=pool).create_client(logger())

client.close()

pool.clear.assert_not_called()


def test_close_closes_pool_it_created():
# When the client creates its own pool (no pool supplied), close() must close
# the pooled connections synchronously -- sending the TCP FIN now -- rather
# than leaving the sockets open until garbage collection. PoolManager.clear()
# alone does not close them on urllib3 2.x.
connection_pool = mock.Mock()
created_pool = mock.MagicMock()
created_pool.pools.keys.return_value = ['poolkey']
created_pool.pools.get.return_value = connection_pool

with mock.patch('ld_eventsource.http.PoolManager', return_value=created_pool):
client = ConnectStrategy.http("http://test").create_client(logger())

client.close()

connection_pool.close.assert_called_once()
created_pool.clear.assert_called_once()
Loading