diff --git a/CHANGELOG.md b/CHANGELOG.md index a815c09..e0c702e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,9 @@ ## [Version 1.2.7](https://github.com/dataiku/dss-plugin-api-connect/releases/tag/v1.2.7) - Feature - 2026-02-18 - Detecting dialect for better csv decoding +- Adding mutual TLS authentication +- Fixing duplication of last line in csv APIs using the recipe +- Dumping API's response as a last resort ## [Version 1.2.6](https://github.com/dataiku/dss-plugin-api-connect/releases/tag/v1.2.6) - Feature - 2025-09-24 diff --git a/custom-recipes/api-connect/recipe.json b/custom-recipes/api-connect/recipe.json index 5842cd0..cc0fac3 100644 --- a/custom-recipes/api-connect/recipe.json +++ b/custom-recipes/api-connect/recipe.json @@ -291,6 +291,81 @@ "visibilityCondition": "model.auth_type!='secure_oauth' && model.auth_type!='secure_basic'", "defaultValue": false }, + { + "name": "force_csv_parameters", + "label": "Force CSV parameters", + "description": "", + "type": "BOOLEAN", + "defaultValue": false + }, + { + "name": "csv_delimiter", + "label": "Delimiter", + "description": "", + "type": "STRING", + "visibilityCondition": "model.force_csv_parameters==true" + }, + { + "name": "csv_doublequote", + "label": "Double quote", + "description": "", + "type": "SELECT", + "selectChoices":[ + {"value": null, "label": "Auto detect"}, + {"value": "double_quote", "label": "Double quote"}, + {"value": "not_double_quote", "label": "No double quote"} + ], + "visibilityCondition": "model.force_csv_parameters==true" + }, + { + "name": "csv_escapechar", + "label": "Escape char", + "description": "", + "type": "STRING", + "visibilityCondition": "model.force_csv_parameters==true" + }, + { + "name": "csv_lineterminator", + "label": "Line terminator", + "description": "", + "type": "STRING", + "visibilityCondition": "model.force_csv_parameters==true" + }, + { + "name": "csv_quotechar", + "label": "Quote char", + "description": "", + "type": "STRING", + "visibilityCondition": "model.force_csv_parameters==true" + }, + { + "name": "csv_quoting", + "label": "Quote", + "description": "", + "type": "SELECT", + "selectChoices":[ + {"value": null, "label": "Auto detect"}, + {"value": 0, "label": "Minimal"}, + {"value": 1, "label": "All"}, + {"value": 2, "label": "Non numeric"}, + {"value": 3, "label": "None"}, + {"value": 4, "label": "Strings"}, + {"value": 5, "label": "Not null"} + ], + "visibilityCondition": "model.force_csv_parameters==true" + }, + { + "name": "csv_skipinitialspace", + "label": "Skip initial space", + "description": "", + "type": "SELECT", + "selectChoices":[ + {"value": null, "label": "Auto detect"}, + {"value": "skip", "label": "Skip"}, + {"value": "not_skip", "label": "Do not skip"} + ], + "visibilityCondition": "model.force_csv_parameters==true" + }, { "name": "redirect_auth_header", "label": "Redirect authorization header", diff --git a/parameter-sets/credential/parameter-set.json b/parameter-sets/credential/parameter-set.json index 4196159..0c027f6 100644 --- a/parameter-sets/credential/parameter-set.json +++ b/parameter-sets/credential/parameter-set.json @@ -119,6 +119,27 @@ "label": "User key/values", "description": "User defined keys/values that can be used later in url, query string...", "type": "KEY_VALUE_LIST" + }, + { + "name": "use_mtls", + "label": "Use mTLS", + "description": "", + "type": "BOOLEAN", + "defaultValue": false + }, + { + "name": "mtls_certificate_path", + "label": "Path to certificate", + "description": "or full certificate starting with -----BEGIN and ending with END CERTIFICATE-----", + "type": "PASSWORD", + "visibilityCondition": "model.use_mtls==true" + }, + { + "name": "mtls_key_path", + "label": "Path to key", + "description": "or full key starting with -----BEGIN and ending with END PRIVATE KEY-----", + "type": "PASSWORD", + "visibilityCondition": "model.use_mtls==true" } ] } diff --git a/parameter-sets/secure-basic/parameter-set.json b/parameter-sets/secure-basic/parameter-set.json index 623d24d..a6b6809 100644 --- a/parameter-sets/secure-basic/parameter-set.json +++ b/parameter-sets/secure-basic/parameter-set.json @@ -38,6 +38,33 @@ "label": "NTLM" } ] + }, + { + "name": "use_mtls", + "label": "Use mTLS", + "description": "", + "type": "BOOLEAN", + "defaultValue": false + }, + { + "type": "SEPARATOR", + "label": "Warning", + "description": "Restricting access to this presset will not restrict access to the certificate and key files. This has to be done by setting the appropriate access rights on these two files.", + "visibilityCondition": "model.use_mtls==true" + }, + { + "name": "mtls_certificate_path", + "label": "Path to certificate", + "description": "or full certificate starting with -----BEGIN and ending with END CERTIFICATE-----", + "type": "PASSWORD", + "visibilityCondition": "model.use_mtls==true" + }, + { + "name": "mtls_key_path", + "label": "Path to key", + "description": "or full key starting with -----BEGIN and ending with END PRIVATE KEY-----", + "type": "PASSWORD", + "visibilityCondition": "model.use_mtls==true" } ] } diff --git a/parameter-sets/secure-oauth/parameter-set.json b/parameter-sets/secure-oauth/parameter-set.json index 464f9ac..72c6437 100644 --- a/parameter-sets/secure-oauth/parameter-set.json +++ b/parameter-sets/secure-oauth/parameter-set.json @@ -47,6 +47,33 @@ "label": "Domain", "description": "", "type": "STRING" + }, + { + "name": "use_mtls", + "label": "Use mTLS", + "description": "", + "type": "BOOLEAN", + "defaultValue": false + }, + { + "type": "SEPARATOR", + "label": "Warning", + "description": "1 - Restricting access to this presset will not restrict access to the certificate and key files. This has to be done by setting the appropriate access rights on these two files.\n2 - Because the OAuth flow is not controled by the plugin, mTLS cannot be used for the retrieving the access token itself.", + "visibilityCondition": "model.use_mtls==true" + }, + { + "name": "mtls_certificate_path", + "label": "Path to certificate", + "description": "or full certificate starting with -----BEGIN and ending with END CERTIFICATE-----", + "type": "PASSWORD", + "visibilityCondition": "model.use_mtls==true" + }, + { + "name": "mtls_key_path", + "label": "Path to key", + "description": "or full key starting with -----BEGIN and ending with END PRIVATE KEY-----", + "type": "PASSWORD", + "visibilityCondition": "model.use_mtls==true" } ] } diff --git a/plugin.json b/plugin.json index afb4273..88ad213 100644 --- a/plugin.json +++ b/plugin.json @@ -6,8 +6,7 @@ "description": "Retrieve data from any REST API", "author": "Dataiku (Alex Bourret)", "icon": "icon-rocket", - "category": "Connect", - "tags": ["API", "Recipe", "Dataset"], + "tags": ["Connector"], "url": "https://www.dataiku.com/product/plugins/api-connect/", "licenseInfo": "Apache Software License", "recipesCategory": "visual" diff --git a/python-connectors/api-connect_dataset/connector.json b/python-connectors/api-connect_dataset/connector.json index 754c507..7d234d6 100644 --- a/python-connectors/api-connect_dataset/connector.json +++ b/python-connectors/api-connect_dataset/connector.json @@ -238,6 +238,81 @@ "visibilityCondition": "model.auth_type!='secure_oauth' && model.auth_type!='secure_basic'", "defaultValue": false }, + { + "name": "force_csv_parameters", + "label": " ", + "description": "Force CSV parameters", + "type": "BOOLEAN", + "defaultValue": false + }, + { + "name": "csv_delimiter", + "label": "Delimiter", + "description": "", + "type": "STRING", + "visibilityCondition": "model.force_csv_parameters==true" + }, + { + "name": "csv_doublequote", + "label": "Double quote", + "description": "", + "type": "SELECT", + "selectChoices":[ + {"value": null, "label": "Auto detect"}, + {"value": "double_quote", "label": "Double quote"}, + {"value": "not_double_quote", "label": "No double quote"} + ], + "visibilityCondition": "model.force_csv_parameters==true" + }, + { + "name": "csv_escapechar", + "label": "Escape char", + "description": "", + "type": "STRING", + "visibilityCondition": "model.force_csv_parameters==true" + }, + { + "name": "csv_lineterminator", + "label": "Line terminator", + "description": "", + "type": "STRING", + "visibilityCondition": "model.force_csv_parameters==true" + }, + { + "name": "csv_quotechar", + "label": "Quote char", + "description": "", + "type": "STRING", + "visibilityCondition": "model.force_csv_parameters==true" + }, + { + "name": "csv_quoting", + "label": "Quote", + "description": "", + "type": "SELECT", + "selectChoices":[ + {"value": null, "label": "Auto detect"}, + {"value": 0, "label": "Minimal"}, + {"value": 1, "label": "All"}, + {"value": 2, "label": "Non numeric"}, + {"value": 3, "label": "None"}, + {"value": 4, "label": "Strings"}, + {"value": 5, "label": "Not null"} + ], + "visibilityCondition": "model.force_csv_parameters==true" + }, + { + "name": "csv_skipinitialspace", + "label": "Skip initial space", + "description": "", + "type": "SELECT", + "selectChoices":[ + {"value": null, "label": "Auto detect"}, + {"value": "skip", "label": "Skip"}, + {"value": "not_skip", "label": "Do not skip"} + ], + "visibilityCondition": "model.force_csv_parameters==true" + }, { "name": "redirect_auth_header", "label": " ", diff --git a/python-connectors/api-connect_dataset/connector.py b/python-connectors/api-connect_dataset/connector.py index fc155de..53e1609 100644 --- a/python-connectors/api-connect_dataset/connector.py +++ b/python-connectors/api-connect_dataset/connector.py @@ -33,6 +33,7 @@ def __init__(self, config, plugin_config): self.raw_output = endpoint_parameters.get("raw_output", None) self.maximum_number_rows = config.get("maximum_number_rows", -1) self.display_metadata = config.get("display_metadata", False) + self.csv_configuration = config def get_read_schema(self): # In this example, we don't specify a schema here, so DSS will infer the schema @@ -60,7 +61,7 @@ def generate_rows(self, dataset_schema=None, dataset_partitioning=None, record_count += 1 yield self.format_output(data, metadata) else: - csv_data = decode_csv_data(data) + csv_data = decode_csv_data(data, self.csv_configuration) if csv_data: record_count += len(csv_data) for row in csv_data: @@ -68,7 +69,7 @@ def generate_rows(self, dataset_schema=None, dataset_partitioning=None, else: record_count += 1 yield { - DKUConstants.API_RESPONSE_KEY: "{}".format(decode_bytes(data)) + DKUConstants.API_RESPONSE_KEY: decode_bytes(data) } if is_records_limit and record_count >= records_limit: break diff --git a/python-lib/dku_constants.py b/python-lib/dku_constants.py index d16088e..7e007f8 100644 --- a/python-lib/dku_constants.py +++ b/python-lib/dku_constants.py @@ -1,7 +1,7 @@ class DKUConstants(object): API_RESPONSE_KEY = "api_response" - FORBIDDEN_KEYS = ["token", "password", "api_key_value", "secure_token"] + FORBIDDEN_KEYS = ["token", "password", "api_key_value", "secure_token", "mtls_key_path", "mtls_certificate_path"] FORM_DATA_BODY_FORMAT = "FORM_DATA" - PLUGIN_VERSION = "1.2.7-beta.1" + PLUGIN_VERSION = "1.2.7-beta.5" RAW_BODY_FORMAT = "RAW" REPONSE_ERROR_KEY = "dku_error" diff --git a/python-lib/dku_utils.py b/python-lib/dku_utils.py index 3b65fa4..d9f59a6 100644 --- a/python-lib/dku_utils.py +++ b/python-lib/dku_utils.py @@ -39,7 +39,11 @@ def get_endpoint_parameters(configuration): "requests_per_minute", "pagination_type", "next_page_url_key", "is_next_page_url_relative", "next_page_url_base", - "top_key", "skip_key", "maximum_number_rows" + "top_key", "skip_key", "maximum_number_rows", + "use_mtls", "mtls_certificate_path", "mtls_key_path", + "force_csv_parameters", "csv_delimiter", + "csv_doublequote", "csv_escapechar", "csv_lineterminator", + "csv_quotechar", "csv_quoting", "csv_skipinitialspace" ] parameters = { endpoint_parameter: configuration.get(endpoint_parameter) for endpoint_parameter in endpoint_parameters if configuration.get(endpoint_parameter) is not None @@ -166,7 +170,7 @@ def xml_to_json(content): return json_response -def decode_csv_data(data): +def decode_csv_data(data, csv_configuation): import csv import io json_data = None @@ -180,7 +184,7 @@ def decode_csv_data(data): dialect.delimiter, dialect.doublequote, dialect.escapechar, - dialect.lineterminator, + dialect.lineterminator.encode(encoding="utf-8"), dialect.quotechar, dialect.quoting, dialect.skipinitialspace @@ -188,19 +192,67 @@ def decode_csv_data(data): ) except Exception as error: logger.error("Could not sniff csv dialect. Error={}".format(error)) - dialect = "excel" - try: - reader = csv.DictReader( - io.StringIO(data), - dialect=dialect - ) - json_data = list(reader) - except Exception as error: - logger.error("Could not extract csv data. Error={}. Trying method 2.".format(error)) + # dialect = "excel" + dialect = csv.Dialect() + dialect.delimiter = ',' + dialect.quotechar = '"' + dialect.doublequote = True + dialect.skipinitialspace = False + dialect.lineterminator = '\r\n' + dialect.quoting = 0 + dialect = update_csv_dialect(csv_configuation, dialect) + if not csv_configuation.get("force_csv_parameters", False): + # For back compatibility reason, if csv params are not forced, + # we try the old method first. + try: + reader = csv.DictReader( + io.StringIO(data), + dialect=dialect + ) + json_data = list(reader) + except Exception as error: + logger.error("Could not extract csv data. Error={}. Trying method 2.".format(error)) + json_data = decode_csv_data_m2(data, dialect) + else: + logger.error("CSV parameters are forced, trying method 2") json_data = decode_csv_data_m2(data, dialect) return json_data +def update_csv_dialect(config, input_dialect): + if config.get("force_csv_parameters", False): + logger.info("Updating csv parameters with ") + csv_delimiter = config.get("csv_delimiter") + if csv_delimiter: + input_dialect.delimiter = csv_delimiter + logger.info("delimiter={}".format(csv_delimiter)) + csv_doublequote = config.get("csv_doublequote", None) + if csv_doublequote: + input_dialect.doublequote = csv_doublequote == "double_quote" + logger.info("doublequote={}".format(input_dialect.doublequote)) + csv_escapechar = config.get("csv_escapechar", "") + if csv_escapechar: + input_dialect.escapechar = csv_escapechar + logger.info("escapechar={}".format(csv_escapechar)) + csv_lineterminator = config.get("csv_lineterminator", "") + if csv_lineterminator: + input_dialect.lineterminator = csv_lineterminator + logger.info("lineterminator={}".format(csv_lineterminator)) + csv_quotechar = config.get("csv_quotechar", "") + if csv_quotechar: + input_dialect.quotechar = csv_quotechar + logger.info("quotechar={}".format(csv_quotechar)) + csv_quoting = config.get("csv_quoting", None) + if csv_quoting is not None: + input_dialect.quoting = csv_quoting + logger.info("quoting={}".format(csv_quoting)) + csv_skipinitialspace = config.get("csv_skipinitialspace", None) + if csv_skipinitialspace: + input_dialect.skipinitialspace = csv_skipinitialspace == "skip" + logger.info("skipinitialspace={}".format(input_dialect.skipinitialspace)) + return input_dialect + + def decode_csv_data_m2(data, dialect): import csv json_data = None diff --git a/python-lib/rest_api_client.py b/python-lib/rest_api_client.py index 170ad41..cb4e824 100644 --- a/python-lib/rest_api_client.py +++ b/python-lib/rest_api_client.py @@ -1,6 +1,7 @@ import requests import time import copy +import tempfile from pagination import Pagination from safe_logger import SafeLogger from loop_detector import LoopDetector @@ -59,6 +60,22 @@ def __init__(self, credential, secure_credentials, endpoint, custom_key_values={ self.requests_kwargs.update({"verify": False}) else: self.requests_kwargs.update({"verify": True}) + if credential.get("use_mtls", False): + mtls_certificate_path = credential.get("mtls_certificate_path") + mtls_key_path = credential.get("mtls_key_path") + self.requests_kwargs.update( + { + "cert": (mtls_certificate_path, mtls_key_path) + } + ) + if secure_credentials.get("use_mtls", False): + mtls_certificate_path = secure_credentials.get("mtls_certificate_path") + mtls_key_path = secure_credentials.get("mtls_key_path") + self.requests_kwargs.update( + { + "cert": (mtls_certificate_path, mtls_key_path) + } + ) self.redirect_auth_header = endpoint.get("redirect_auth_header", False) self.timeout = endpoint.get("timeout", -1) if self.timeout > 0: @@ -168,14 +185,35 @@ def request(self, method, url, can_raise_exeption=True, **kwargs): def request_with_redirect_retry(self, method, url, **kwargs): # In case of redirection to another domain, the authorization header is not kept # If redirect_auth_header is true, another attempt is made with initial headers to the redirected url - response = self.session.request(method, url, **kwargs) + response = self.request_with_cert(method, url, **kwargs) if self.redirect_auth_header and not response.url.startswith(url): redirection_kwargs = copy.deepcopy(kwargs) redirection_kwargs.pop("params", None) # params are contained in the redirected url logger.warning("Redirection ! Accessing endpoint {} with initial authorization headers".format(response.url)) - response = self.session.request(method, response.url, **redirection_kwargs) + response = self.request_with_cert(method, response.url, **redirection_kwargs) return response + def request_with_cert(self, method, url, **kwargs): + cert = kwargs.get("cert", None) + if cert and len(cert) == 2: + if cert[0].startswith("-----BEGIN CERTIFICATE") and cert[1].startswith("-----BEGIN "): + logger.info("mTLS certificate and key are strings") + response = None + with tempfile.NamedTemporaryFile(mode="w", suffix=".crt") as tmp_certificate: + with tempfile.NamedTemporaryFile(mode="w", suffix=".key") as tmp_key: + tmp_certificate.write( + normalize_key(cert[0]) + ) + tmp_certificate.seek(0) + tmp_key.write( + normalize_key(cert[1]) + ) + tmp_key.seek(0) + kwargs["cert"] = (tmp_certificate.name, tmp_key.name) + response = self.session.request(method, url, **kwargs) + return response + return self.session.request(method, url, **kwargs) + def paginated_api_call(self, can_raise_exeption=True): if self.pagination.params_must_be_blanked: self.requests_kwargs["params"] = {} @@ -262,3 +300,20 @@ def get_headers(response): if isinstance(response, requests.Response): return response.headers return None + + +def normalize_key(key): + PROTECTED_EXPRESSIONS = [ + "BEGIN CERTIFICATE", "END CERTIFICATE", + "BEGIN PRIVATE KEY", "END PRIVATE KEY", + "BEGIN RSA PRIVATE KEY", "END RSA PRIVATE KEY" + ] + tempo_text = str(key) + for expression_to_protect in PROTECTED_EXPRESSIONS: + protected_form = expression_to_protect.replace(" ", "") + tempo_text = tempo_text.replace(expression_to_protect, protected_form) + tempo_text = tempo_text.replace(" ", "\n") + for expression_to_protect in PROTECTED_EXPRESSIONS: + protected_form = expression_to_protect.replace(" ", "") + tempo_text = tempo_text.replace(protected_form, expression_to_protect) + return tempo_text diff --git a/python-lib/rest_api_recipe_session.py b/python-lib/rest_api_recipe_session.py index 8a474ba..cb5ad3c 100644 --- a/python-lib/rest_api_recipe_session.py +++ b/python-lib/rest_api_recipe_session.py @@ -1,7 +1,7 @@ from dataikuapi.utils import DataikuException from rest_api_client import RestAPIClient from safe_logger import SafeLogger -from dku_utils import parse_keys_for_json, get_value_from_path, decode_csv_data, de_NaN +from dku_utils import parse_keys_for_json, get_value_from_path, decode_csv_data, de_NaN, decode_bytes from dku_constants import DKUConstants import copy import json @@ -29,6 +29,7 @@ def __init__(self, custom_key_values, credential_parameters, secure_credentials, self.is_row_limit = (self.maximum_number_rows > 0) self.behaviour_when_error = behaviour_when_error or "add-error-column" self.can_raise = self.behaviour_when_error == "raise" + self.csv_configuration = endpoint_parameters @staticmethod def get_column_to_parameter_dict(parameter_columns, parameter_renamings): @@ -111,9 +112,14 @@ def retrieve_next_page(self, is_raw_output): if is_error_message(json_response): base_row.update(parse_keys_for_json(json_response)) else: - base_row.update({ - DKUConstants.API_RESPONSE_KEY: json.dumps(json_response) - }) + try: + base_row.update({ + DKUConstants.API_RESPONSE_KEY: json.dumps(json_response) + }) + except Exception: + base_row.update({ + DKUConstants.API_RESPONSE_KEY: decode_bytes(json_response) + }) else: if isinstance(json_response, dict): base_row.update(parse_keys_for_json(json_response)) @@ -125,8 +131,16 @@ def retrieve_next_page(self, is_raw_output): base_row.update(self.initial_parameter_columns) page_rows.append(base_row) else: - json_response = decode_csv_data(json_response) - for row in json_response: + decoded_csv_data = decode_csv_data(json_response, self.csv_configuration) + is_api_returning_dict = False + if not decoded_csv_data and json_response: + logger.warning("Data is not in CSV format. Dumping it in text mode.") + decoded_csv_data = [ + { + DKUConstants.API_RESPONSE_KEY: decode_bytes(json_response) + } + ] + for row in decoded_csv_data: base_row = copy.deepcopy(metadata) base_row.update(parse_keys_for_json(row)) base_row.update(self.initial_parameter_columns) @@ -140,7 +154,7 @@ def format_page_rows(self, data_rows, is_raw_output, metadata=None): page_rows = [] metadata = metadata or {} if type(data_rows) in [str, bytes]: - data_rows = decode_csv_data(data_rows) + data_rows = decode_csv_data(data_rows, self.csv_configuration) if type(data_rows) in [list]: for data_row in data_rows: base_row = copy.deepcopy(self.initial_parameter_columns) diff --git a/tests/python/integration/test_scenario.py b/tests/python/integration/test_scenario.py index 63c41af..315b6fc 100644 --- a/tests/python/integration/test_scenario.py +++ b/tests/python/integration/test_scenario.py @@ -57,3 +57,7 @@ def test_run_api_connect_xml_handling(user_dss_clients): def test_run_api_connect_parameters_renaming(user_dss_clients): dss_scenario.run(user_dss_clients, project_key=TEST_PROJECT_KEY, scenario_id="COLUMNPARAMETERRENAMING") + + +def test_run_api_connect_mtls(user_dss_clients): + dss_scenario.run(user_dss_clients, project_key=TEST_PROJECT_KEY, scenario_id="MTLS")