diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 45fe6aa2d..31f4230ac 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -233,6 +233,30 @@ def __init__( concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2 + if concurrency_level_from_manifest: + raw_default_concurrency = concurrency_level_from_manifest.get( + "default_concurrency", "N/A" + ) + self.logger.info( + "Concurrency configuration: concurrency_level=%d, initial_number_of_partitions_to_generate=%d, " + "source=manifest (expression=%s), config=%s", + concurrency_level, + initial_number_of_partitions_to_generate, + raw_default_concurrency, + { + k: v + for k, v in (config or {}).items() + if "worker" in k.lower() or "concurren" in k.lower() + }, + ) + else: + self.logger.info( + "Concurrency configuration: concurrency_level=%d, initial_number_of_partitions_to_generate=%d, " + "source=default (_LOWEST_SAFE_CONCURRENCY_LEVEL)", + concurrency_level, + initial_number_of_partitions_to_generate, + ) + self._concurrent_source = ConcurrentSource.create( num_workers=concurrency_level, initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate,