From a4b605116b035663fa0ff53335dd90dc0d3baf17 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 27 Feb 2026 09:36:26 +0000 Subject: [PATCH 1/3] feat: log concurrency configuration at startup for debugging Co-Authored-By: unknown <> --- .../sources/declarative/concurrent_declarative_source.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 45fe6aa2d..20e9251f7 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -233,6 +233,13 @@ def __init__( concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2 + self.logger.info( + "Concurrency configuration: concurrency_level=%d, initial_number_of_partitions_to_generate=%d, concurrency_level_from_manifest=%s", + concurrency_level, + initial_number_of_partitions_to_generate, + "defined" if concurrency_level_from_manifest else "not_defined", + ) + self._concurrent_source = ConcurrentSource.create( num_workers=concurrency_level, initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate, From 69fb8982e541e15187449491910f23cc2348acfa Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 27 Feb 2026 16:28:34 +0000 Subject: [PATCH 2/3] feat: enhance concurrency logging to differentiate config vs default values Co-Authored-By: unknown <> --- .../concurrent_declarative_source.py | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 20e9251f7..64b711eb6 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -233,12 +233,23 @@ def __init__( concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2 - self.logger.info( - "Concurrency configuration: concurrency_level=%d, initial_number_of_partitions_to_generate=%d, concurrency_level_from_manifest=%s", - concurrency_level, - initial_number_of_partitions_to_generate, - "defined" if concurrency_level_from_manifest else "not_defined", - ) + if concurrency_level_from_manifest: + raw_default_concurrency = concurrency_level_from_manifest.get("default_concurrency", "N/A") + self.logger.info( + "Concurrency configuration: concurrency_level=%d, initial_number_of_partitions_to_generate=%d, " + "source=manifest (expression=%s), config=%s", + concurrency_level, + initial_number_of_partitions_to_generate, + raw_default_concurrency, + {k: v for k, v in (config or {}).items() if "worker" in k.lower() or "concurren" in k.lower()}, + ) + else: + self.logger.info( + "Concurrency configuration: concurrency_level=%d, initial_number_of_partitions_to_generate=%d, " + "source=default (_LOWEST_SAFE_CONCURRENCY_LEVEL)", + concurrency_level, + initial_number_of_partitions_to_generate, + ) self._concurrent_source = ConcurrentSource.create( num_workers=concurrency_level, From 4ea2fe0e42618b3cdb3d012f09b6e0fdc1d7adb6 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 27 Feb 2026 16:29:53 +0000 Subject: [PATCH 3/3] style: fix ruff formatting Co-Authored-By: unknown <> --- .../declarative/concurrent_declarative_source.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 64b711eb6..31f4230ac 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -234,14 +234,20 @@ def __init__( initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2 if concurrency_level_from_manifest: - raw_default_concurrency = concurrency_level_from_manifest.get("default_concurrency", "N/A") + raw_default_concurrency = concurrency_level_from_manifest.get( + "default_concurrency", "N/A" + ) self.logger.info( "Concurrency configuration: concurrency_level=%d, initial_number_of_partitions_to_generate=%d, " "source=manifest (expression=%s), config=%s", concurrency_level, initial_number_of_partitions_to_generate, raw_default_concurrency, - {k: v for k, v in (config or {}).items() if "worker" in k.lower() or "concurren" in k.lower()}, + { + k: v + for k, v in (config or {}).items() + if "worker" in k.lower() or "concurren" in k.lower() + }, ) else: self.logger.info(