From 2434d266901d222f322c5ef6b4c060c2e2c2c993 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Wed, 29 Apr 2026 18:30:04 +0800 Subject: [PATCH] [python] Make HTTP timeout/retry/keep-alive configurable via CatalogOptions The REST HttpClient hardcoded its retry count and ignored its own session timeout (the requests library does not honour Session.timeout; the timeout must be passed to session.request()). Introduce five CatalogOptions so users can tune REST behaviour without monkey-patching: * http.connect-timeout (int, default 180) * http.read-timeout (int, default 180) * http.max-connect-retries (int, default 3) * http.max-read-retries (int, default 3) * http.keep-alive (bool, default true) HttpClient now accepts an optional Options argument, reads these keys, applies the timeout via session.request(timeout=...), separates connect and read retry counters in ExponentialRetry (total=None lets each type of retry govern independently), and emits Connection: close when keep-alive is disabled. RESTApi forwards its options through. Update token_loader.py to use the new ExponentialRetry signature, and add HttpClient option-coverage tests alongside the retry tests. --- paimon-python/pypaimon/api/client.py | 41 ++++++++++++----- paimon-python/pypaimon/api/rest_api.py | 2 +- paimon-python/pypaimon/api/token_loader.py | 2 +- .../pypaimon/common/options/config.py | 10 +++++ .../pypaimon/tests/rest/client_test.py | 43 +++++++++++++++++- .../rest/test_exponential_retry_strategy.py | 45 ++++++++++++------- 6 files changed, 113 insertions(+), 30 deletions(-) diff --git a/paimon-python/pypaimon/api/client.py b/paimon-python/pypaimon/api/client.py index c2f38912f432..517eae210bcb 100644 --- a/paimon-python/pypaimon/api/client.py +++ b/paimon-python/pypaimon/api/client.py @@ -149,16 +149,18 @@ class ExponentialRetry: adapter: HTTPAdapter - def __init__(self, max_retries: int = 5): - retry = self.__create_retry_strategy(max_retries) + def __init__(self, connect_retries: int = 3, read_retries: int = 3): + retry = self.__create_retry_strategy(connect_retries, read_retries) self.adapter = HTTPAdapter(max_retries=retry) @staticmethod - def __create_retry_strategy(max_retries: int) -> Retry: + def __create_retry_strategy(connect_retries: int, read_retries: int) -> Retry: retry_kwargs = { - 'total': max_retries, - 'read': max_retries, - 'connect': 0, + # total=None means no overall cap; per-type counters govern independently + 'total': None, + 'connect': connect_retries, + 'read': read_retries, + 'status': read_retries, 'backoff_factor': 1, 'status_forcelist': [429, 502, 503, 504], 'raise_on_status': False, @@ -264,18 +266,36 @@ class HttpClient(RESTClient): REQUEST_ID_KEY = "x-request-id" DEFAULT_REQUEST_ID = "unknown" - def __init__(self, uri: str): + def __init__(self, uri: str, options=None): + from pypaimon.common.options.config import CatalogOptions + self.logger = logging.getLogger(self.__class__.__name__) self.uri = _normalize_uri(uri) self.error_handler = DefaultErrorHandler.get_instance() self.session = requests.Session() - retry_interceptor = ExponentialRetry(max_retries=3) + if options is not None: + connect_timeout = options.get(CatalogOptions.HTTP_CONNECT_TIMEOUT) + read_timeout = options.get(CatalogOptions.HTTP_READ_TIMEOUT) + connect_retries = options.get(CatalogOptions.HTTP_MAX_CONNECT_RETRIES) + read_retries = options.get(CatalogOptions.HTTP_MAX_READ_RETRIES) + keep_alive = options.get(CatalogOptions.HTTP_KEEP_ALIVE) + else: + connect_timeout = CatalogOptions.HTTP_CONNECT_TIMEOUT.default_value() + read_timeout = CatalogOptions.HTTP_READ_TIMEOUT.default_value() + connect_retries = CatalogOptions.HTTP_MAX_CONNECT_RETRIES.default_value() + read_retries = CatalogOptions.HTTP_MAX_READ_RETRIES.default_value() + keep_alive = CatalogOptions.HTTP_KEEP_ALIVE.default_value() + + self._timeout = (connect_timeout, read_timeout) + retry_interceptor = ExponentialRetry( + connect_retries=connect_retries, read_retries=read_retries) self.session.mount("http://", retry_interceptor.adapter) self.session.mount("https://", retry_interceptor.adapter) - self.session.timeout = (180, 180) + if not keep_alive: + self.session.headers.update({"Connection": "close"}) self.session.headers.update({ 'Accept': 'application/json' @@ -361,7 +381,8 @@ def _execute_request(self, method: str, url: str, method=method, url=url, data=data.encode('utf-8') if data else None, - headers=headers + headers=headers, + timeout=self._timeout, ) duration_ms = (int(time.time() * 1_000_000_000) - start_time) // 1_000_000 response_request_id = response.headers.get(self.REQUEST_ID_KEY, self.DEFAULT_REQUEST_ID) diff --git a/paimon-python/pypaimon/api/rest_api.py b/paimon-python/pypaimon/api/rest_api.py index e2244211315c..458905265187 100755 --- a/paimon-python/pypaimon/api/rest_api.py +++ b/paimon-python/pypaimon/api/rest_api.py @@ -75,7 +75,7 @@ def __init__(self, options: Union[Options, Dict[str, str]], config_required: boo raise ValueError("URI cannot be empty") self.logger = logging.getLogger(self.__class__.__name__) - self.client = HttpClient(uri) + self.client = HttpClient(uri, options) auth_provider = AuthProviderFactory.create_auth_provider(options) base_headers = RESTUtil.extract_prefix_map(options, self.HEADER_PREFIX) diff --git a/paimon-python/pypaimon/api/token_loader.py b/paimon-python/pypaimon/api/token_loader.py index 1e8aa7834931..a0a060446efa 100644 --- a/paimon-python/pypaimon/api/token_loader.py +++ b/paimon-python/pypaimon/api/token_loader.py @@ -94,7 +94,7 @@ def __init__(self, connect_timeout: int = 180, read_timeout: int = 180, self.session = requests.Session() # Add retry adapter - retry_interceptor = ExponentialRetry(max_retries=3) + retry_interceptor = ExponentialRetry(connect_retries=3, read_retries=3) adapter = HTTPAdapter(max_retries=retry_interceptor.adapter) self.session.mount("http://", adapter) diff --git a/paimon-python/pypaimon/common/options/config.py b/paimon-python/pypaimon/common/options/config.py index ee11d12723e9..17a14bb810b8 100644 --- a/paimon-python/pypaimon/common/options/config.py +++ b/paimon-python/pypaimon/common/options/config.py @@ -84,6 +84,16 @@ class CatalogOptions: PREFIX = ConfigOptions.key("prefix").string_type().no_default_value().with_description("Prefix") HTTP_USER_AGENT_HEADER = ConfigOptions.key( "header.HTTP_USER_AGENT").string_type().no_default_value().with_description("HTTP User Agent header") + HTTP_CONNECT_TIMEOUT = ConfigOptions.key("http.connect-timeout").int_type() \ + .default_value(180).with_description("HTTP connect timeout in seconds") + HTTP_READ_TIMEOUT = ConfigOptions.key("http.read-timeout").int_type() \ + .default_value(180).with_description("HTTP read timeout in seconds") + HTTP_MAX_CONNECT_RETRIES = ConfigOptions.key("http.max-connect-retries").int_type() \ + .default_value(3).with_description("HTTP max retries for connect errors") + HTTP_MAX_READ_RETRIES = ConfigOptions.key("http.max-read-retries").int_type() \ + .default_value(3).with_description("HTTP max retries for read/status errors (429/502/503/504)") + HTTP_KEEP_ALIVE = ConfigOptions.key("http.keep-alive").boolean_type() \ + .default_value(True).with_description("Enable HTTP keep-alive") BLOB_FILE_IO_DEFAULT_CACHE_SIZE = 2 ** 31 - 1 diff --git a/paimon-python/pypaimon/tests/rest/client_test.py b/paimon-python/pypaimon/tests/rest/client_test.py index 6f381214c7ab..807fe94d99c8 100644 --- a/paimon-python/pypaimon/tests/rest/client_test.py +++ b/paimon-python/pypaimon/tests/rest/client_test.py @@ -16,7 +16,9 @@ import unittest -from pypaimon.api.client import _parse_error_response +from pypaimon.api.client import HttpClient, _parse_error_response +from pypaimon.common.options import Options +from pypaimon.common.options.config import CatalogOptions class HttpClientTest(unittest.TestCase): @@ -58,5 +60,44 @@ def test_parse_error_response_with_unparsable_json(self): self.assertEqual(error.resource_name, '') +class HttpClientHttpOptionsTest(unittest.TestCase): + """HttpClient honours CatalogOptions for timeout / retries / keep-alive.""" + + def test_defaults_when_options_is_none(self): + client = HttpClient("http://localhost:8080") + self.assertEqual( + client._timeout, + (CatalogOptions.HTTP_CONNECT_TIMEOUT.default_value(), + CatalogOptions.HTTP_READ_TIMEOUT.default_value())) + self.assertNotEqual( + client.session.headers.get("Connection", ""), "close", + "keep-alive defaults to True; no Connection: close header expected") + + def test_custom_timeouts_applied(self): + opts = Options({ + "http.connect-timeout": "30", + "http.read-timeout": "45", + }) + client = HttpClient("http://localhost:8080", opts) + self.assertEqual(client._timeout, (30, 45)) + + def test_keep_alive_disabled_sets_connection_close(self): + opts = Options({"http.keep-alive": "false"}) + client = HttpClient("http://localhost:8080", opts) + self.assertEqual(client.session.headers.get("Connection"), "close") + + def test_custom_retry_counts_applied(self): + opts = Options({ + "http.max-connect-retries": "7", + "http.max-read-retries": "9", + }) + client = HttpClient("http://localhost:8080", opts) + adapter = client.session.get_adapter("http://localhost:8080") + retry = adapter.max_retries + self.assertEqual(retry.connect, 7) + self.assertEqual(retry.read, 9) + self.assertEqual(retry.status, 9) + + if __name__ == '__main__': unittest.main() diff --git a/paimon-python/pypaimon/tests/rest/test_exponential_retry_strategy.py b/paimon-python/pypaimon/tests/rest/test_exponential_retry_strategy.py index b6ea91e869d5..f4ac34f25f95 100644 --- a/paimon-python/pypaimon/tests/rest/test_exponential_retry_strategy.py +++ b/paimon-python/pypaimon/tests/rest/test_exponential_retry_strategy.py @@ -27,38 +27,49 @@ class TestExponentialRetryStrategy(unittest.TestCase): - def setUp(self): - self.retry_strategy = ExponentialRetry(max_retries=5) - def test_basic_retry(self): - retry = ExponentialRetry._ExponentialRetry__create_retry_strategy(5) - - self.assertEqual(retry.total, 5) - self.assertEqual(retry.read, 5) - self.assertEqual(retry.connect, 0) # Connection errors should not retry - + retry = ExponentialRetry._ExponentialRetry__create_retry_strategy(5, 3) + + self.assertIsNone(retry.total) + self.assertEqual(retry.connect, 5) + self.assertEqual(retry.read, 3) + self.assertEqual(retry.status, 3) + self.assertIn(429, retry.status_forcelist) # Too Many Requests self.assertIn(503, retry.status_forcelist) # Service Unavailable self.assertNotIn(404, retry.status_forcelist) - def test_no_retry_on_connect_error(self): + def test_retry_on_connect_error(self): + retry_strategy = ExponentialRetry(connect_retries=2, read_retries=0) session = requests.Session() - session.mount("http://", self.retry_strategy.adapter) - session.mount("https://", self.retry_strategy.adapter) - session.timeout = (1, 1) + session.mount("http://", retry_strategy.adapter) + session.mount("https://", retry_strategy.adapter) start_time = time.time() - + try: session.get("http://192.168.255.255:9999", timeout=(1, 1)) self.fail("Expected ConnectionError") except (ConnectionError, ConnectTimeout, Timeout, NewConnectionError, MaxRetryError): elapsed = time.time() - start_time - self.assertLess( - elapsed, 5.0, - f"Connection error took {elapsed:.2f}s, should fail quickly without retry" + # connect_retries=2 with backoff_factor=1 → at least 1s of backoff between attempts + self.assertGreaterEqual( + elapsed, 2.0, + f"Connection error took {elapsed:.2f}s, expected retries with backoff" ) + def test_no_connect_retry_but_read_retry(self): + retry = ExponentialRetry._ExponentialRetry__create_retry_strategy(0, 5) + self.assertEqual(retry.connect, 0) + self.assertEqual(retry.read, 5) + self.assertEqual(retry.status, 5) + + def test_zero_retries(self): + retry = ExponentialRetry._ExponentialRetry__create_retry_strategy(0, 0) + self.assertEqual(retry.connect, 0) + self.assertEqual(retry.read, 0) + self.assertEqual(retry.status, 0) + if __name__ == '__main__': unittest.main()