Skip to content

feat: Add block_simultaneous_read with top-level stream_groups interface#870

Open
Anatolii Yatsuk (tolik0) wants to merge 26 commits intomainfrom
tolik0/concurrent-source/add-block_simultaneous_read
Open

feat: Add block_simultaneous_read with top-level stream_groups interface#870
Anatolii Yatsuk (tolik0) wants to merge 26 commits intomainfrom
tolik0/concurrent-source/add-block_simultaneous_read

Conversation

@tolik0
Copy link
Contributor

@tolik0 Anatolii Yatsuk (tolik0) commented Dec 30, 2025

Summary

Adds a block_simultaneous_read feature to prevent multiple streams from running concurrently when they share the same resource. The feature is configured via a top-level stream_groups structure in the manifest (rather than per-stream properties).

Interface

stream_groups:
  crm_objects:
    streams:
      - "#/definitions/deals_property_history_stream"
      - "#/definitions/companies_property_history_stream"
    action: BlockSimultaneousSyncsAction
  • stream_groups is a top-level manifest property (alongside streams, definitions, check, etc.)
  • Each group has a name (e.g. crm_objects), a list of stream $ref references, and an action
  • BlockSimultaneousSyncsAction is the only action type for now
  • Streams in the same group will not run concurrently; streams in different groups run freely
  • Important: Child streams that depend on parent streams should not be placed in the same group as their parents — doing so would cause a deadlock (the child needs to read the parent during partition generation). A validation check enforces this at config time.

Implementation

  1. Schema (declarative_component_schema.yaml): Removed per-stream block_simultaneous_read property from DeclarativeStream. Added top-level stream_groups with StreamGroup and BlockSimultaneousSyncsAction definitions.

  2. Pydantic models (declarative_component_schema.py): Added BlockSimultaneousSyncsAction, StreamGroup classes. Added stream_groups: Optional[Dict[str, StreamGroup]] to DeclarativeSource1/DeclarativeSource2.

  3. ConcurrentDeclarativeSource._apply_stream_groups(): Resolves stream groups from actual stream instances after stream creation. Validates that no stream shares a group with any of its parent streams (deadlock prevention). Sets block_simultaneous_read on matching DefaultStream instances.

  4. DefaultStream.get_partition_router(): New helper method that safely traverses the partition_generator → stream_slicer → partition_router chain using isinstance checks, replacing the hasattr chains in ConcurrentReadProcessor.

  5. ConcurrentReadProcessor (core blocking logic): Uses group-based deferral/retry with parent-child awareness. Added is_done() safety check that raises AirbyteTracedException if streams remain in the partition generation queue after all streams are marked done.

Blocking Behavior

  • When a stream is about to start, the CDK checks if another stream in the same group (or any parent stream with a blocking group) is active
  • Blocked streams are deferred to the end of the queue and retried when the blocker completes
  • Streams in different groups run concurrently without interference

First use case: source-intercom — prevents duplicate concurrent requests to the companies endpoint.
Resolves: https://github.com/airbytehq/oncall/issues/8346

Updates since last revision

  • get_partition_router() helper on DefaultStream. Replaces messy hasattr chains in ConcurrentReadProcessor._collect_all_parent_stream_names() with a clean method that uses isinstance checks to traverse StreamSlicerPartitionGenerator → ConcurrentPerPartitionCursor → partition_router.
  • Deadlock validation in _apply_stream_groups(). Raises ValueError at config time if a stream and any of its parent streams are in the same blocking group.
  • is_done() safety check. Raises AirbyteTracedException (system_error) if _stream_instances_to_start_partition_generation is not empty after all streams are marked done — catches stuck-stream bugs at runtime.
  • Moved inline imports to module level. All imports (ConcurrentPerPartitionCursor, StreamSlicerPartitionGenerator, PartitionRouter, SubstreamPartitionRouter) are now at the top of files per Python coding standards.

Review & Testing Checklist for Human

  • Verify deadlock validation works correctly: Test with a manifest where a child stream and its parent are both in the same group — should raise ValueError at config time with a clear message.
  • Verify get_partition_router() handles all stream types: Test with both DefaultStream (with StreamSlicerPartitionGenerator) and legacy DeclarativeStream paths. Confirm it returns None for streams without partition routers.
  • Verify is_done() check doesn't mask real issues: If the safety check triggers, investigate the root cause (why streams remained in the queue) rather than just fixing the symptom.
  • Test with source-intercom: Run integration test with the new stream_groups config format to confirm end-to-end blocking behavior and verify no deadlocks occur.
  • Verify no circular import issues: The imports moved to module level (ConcurrentPerPartitionCursor, StreamSlicerPartitionGenerator, SubstreamPartitionRouter) should not cause circular dependencies in production connectors.

Recommended test plan:

  1. Create a test manifest with stream_groups referencing 2+ streams via $ref
  2. Run ConcurrentDeclarativeSource and verify streams in the same group are read sequentially
  3. Check logs for deferral messages and group activation/deactivation
  4. Test deadlock validation by placing a child stream and its parent in the same group (should fail at config time)
  5. Verify CI passes (especially schema validation and unit tests)
  6. Test with source-intercom to ensure no regressions and that blocking works as expected

Notes

  • Breaking changes: None. This is a new interface; existing connectors without stream_groups continue to work unchanged.
  • Backward compatibility: The internal ConcurrentReadProcessor still uses string-based group identifiers, so the blocking logic is unchanged.
  • Pre-existing test failure: test_read_with_concurrent_and_synchronous_streams_with_concurrent_state fails with SQLite locking issues (pre-existing, not caused by these changes).
  • Session URL: https://app.devin.ai/sessions/5184df5176d54d7c91ddcb9635c28dda
  • Requested by: Anatolii Yatsuk (gl_anatolii.yatsuk)

Summary by CodeRabbit

  • New Features

    • Manifest-configurable stream groups to prevent concurrent reads for grouped streams; group setting flows to stream components.
    • Active-stream tracking with deferral/retry logic to coordinate partition generation and start-next behavior.
    • Helper method get_partition_router() on DefaultStream for safe partition router access.
    • Deadlock validation to prevent parent-child streams from sharing the same blocking group.
    • Safety check in is_done() to detect stuck streams in partition generation queue.
  • Bug Fixes

    • Safer activation/deactivation and queue handling, avoiding infinite blocking and stuck/duplicated processing; improved logging for state transitions.
  • Tests

    • Extensive unit tests covering group blocking, parent/child interactions, deferral/retry semantics, queue ordering, and logging.

@github-actions github-actions bot added the enhancement New feature or request label Dec 30, 2025
@github-actions
Copy link

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

Testing This CDK Version

You can test this version of the CDK using the following:

# Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@tolik0/concurrent-source/add-block_simultaneous_read#egg=airbyte-python-cdk[dev]' --help

# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch tolik0/concurrent-source/add-block_simultaneous_read

Helpful Resources

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /autofix - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test - Runs connector tests with the updated CDK
  • /prerelease - Triggers a prerelease publish with default arguments
  • /poe build - Regenerate git-committed build artifacts, such as the pydantic models which are generated from the manifest JSON schema in YAML.
  • /poe <command> - Runs any poe command in the CDK environment

📝 Edit this welcome message.

@github-actions
Copy link

github-actions bot commented Dec 30, 2025

PyTest Results (Fast)

3 899 tests  +3 477   3 887 ✅ +3 475   6m 58s ⏱️ + 5m 20s
    1 suites ±    0      12 💤 +    3 
    1 files   ±    0       0 ❌  -     1 

Results for commit 756a966. ± Comparison against base commit 452acd1.

♻️ This comment has been updated with latest results.

@github-actions
Copy link

github-actions bot commented Dec 30, 2025

PyTest Results (Full)

3 902 tests  +79   3 890 ✅ +81   11m 8s ⏱️ +43s
    1 suites ± 0      12 💤 ± 0 
    1 files   ± 0       0 ❌  -  2 

Results for commit 756a966. ± Comparison against base commit 452acd1.

♻️ This comment has been updated with latest results.

@tolik0
Copy link
Contributor Author

Anatolii Yatsuk (tolik0) commented Jan 6, 2026

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

🟦 Job completed successfully (no changes).

@tolik0
Copy link
Contributor Author

Anatolii Yatsuk (tolik0) commented Jan 6, 2026

/prerelease

Prerelease Job Info

This job triggers the publish workflow with default arguments to create a prerelease.

Prerelease job started... Check job output.

✅ Prerelease workflow triggered successfully.

View the publish workflow run: https://github.com/airbytehq/airbyte-python-cdk/actions/runs/20754787469

@tolik0
Copy link
Contributor Author

Anatolii Yatsuk (tolik0) commented Jan 9, 2026

/prerelease

Prerelease Job Info

This job triggers the publish workflow with default arguments to create a prerelease.

Prerelease job started... Check job output.

✅ Prerelease workflow triggered successfully.

View the publish workflow run: https://github.com/airbytehq/airbyte-python-cdk/actions/runs/20859295403

@tolik0
Copy link
Contributor Author

Anatolii Yatsuk (tolik0) commented Jan 12, 2026

/prerelease

Prerelease Job Info

This job triggers the publish workflow with default arguments to create a prerelease.

Prerelease job started... Check job output.

✅ Prerelease workflow triggered successfully.

View the publish workflow run: https://github.com/airbytehq/airbyte-python-cdk/actions/runs/20918717015

@tolik0 Anatolii Yatsuk (tolik0) marked this pull request as ready for review January 12, 2026 12:06
Copilot AI review requested due to automatic review settings January 12, 2026 12:06
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a block_simultaneous_read feature to the Python CDK that prevents concurrent execution of streams sharing the same resource (API endpoint, session, or rate limit pool). The feature uses string-based group identifiers where streams with matching non-empty group names will not run concurrently, addressing issues like duplicate API calls when streams function as both standalone and parent streams.

Changes:

  • Added block_simultaneous_read property to stream interfaces and schema definitions with empty string as default (backward compatible)
  • Implemented blocking logic in ConcurrentReadProcessor that defers streams when their group or parent's group is active
  • Added comprehensive test coverage for various blocking scenarios including parent-child relationships and multi-level hierarchies

Reviewed changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
test_concurrent_read_processor.py Added comprehensive test suite covering all blocking scenarios
test_model_to_component_factory.py Added integration test verifying manifest-to-stream property flow
default_stream.py Added block_simultaneous_read property to DefaultStream
adapters.py Added property adapter for legacy stream compatibility
abstract_stream.py Added abstract property definition with documentation
model_to_component_factory.py Integrated property from manifest to stream construction
declarative_component_schema.py Generated schema with new property definition
declarative_component_schema.yaml Added schema definition with comprehensive documentation
concurrent_read_processor.py Implemented core blocking logic with group tracking and deferral
Comments suppressed due to low confidence (1)

airbyte_cdk/sources/declarative/declarative_component_schema.yaml:1

  • The description change for use_cache appears unrelated to the block_simultaneous_read feature. This change should be separated into its own PR or have an explanation for why it's included in this feature PR.
"$schema": http://json-schema.org/draft-07/schema#

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Contributor

@brianjlai Brian Lai (brianjlai) left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have some mixed feelings about how we represent this stream grouping to support blocking from the language. It feels like allowing it to be a string and just grouping to together ourselves might be a little bit error prone for users. It might take some redesigning of how we parse these groups, but I'm wondering if an interesting approach would be something like a StreamGroup which would in turn contain a set of streams that needs to be unblocking. And potentially config options like blocking or whatever other ones we might think of. And maybe StreamGroup is itself a subclass of a Stream. Granted I haven't dedicated a ton of time to fleshing this out more, but maybe this is something we can sync on.

I think the other thing that worries me about this PR is that it feels quite risky because all of this grouping and pausing logic along the processing queue and which partitions we put on the queue feels like a lot of complexity and if we're not careful, we might introduce a deadlock for all of our connectors. I'm having a hard time parsing through the logic and getting a clear picture of what edge cases I might be missing honestly.

So two things would be:

  • I think us doing an in-person review of this over Zoom would be pretty helpful
  • I think the logic is meant to be a no-op if no streams define a blocking group so then self_stream_block_simultaneous_read will all be "". But the logic is still pretty deeply integrated across the whole flow so a bug here could affect all our connectors. Maybe we can try to find a better way of isolating this behavior, and that's where that dedicated stream group component might be interesting.

This feels like a legitimate full fledged project if I'm being honest because there are some DX considerations on how to represent this and maybe some ways we need to derisk this change a bit more.

# All streams in the queue are currently blocked
return None

def is_done(self) -> bool:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a check if no streams in _stream_instances_to_start_partition_generation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also worth it to maybe check that active stream groups is empty too

Copy link
Contributor

@brianjlai Brian Lai (brianjlai) left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a few things and a lot of comments to clarify my understanding of the flow and what is expected to happen under different scenarios.

We already chatted over Zoom about the changes to the interface to more explicitly define stream groupings and the blocking actions, Daryna Ishchenko (@darynaishchenko) can approve that portion while I am out on PTO.

hasattr(stream, "_stream_partition_generator")
and hasattr(stream._stream_partition_generator, "_stream_slicer")
and hasattr(stream._stream_partition_generator._stream_slicer, "_partition_router")
):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels pretty messy to have to perform such a careful assertion on a bunch of potentially null private fields. I know it'll be too much work to make these public, but can we create a helper method on DefaultStream called get_partition_router() that does all this for us? and then we can avoid all these hasattr and we just get back the partition router or None? wdyt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

# Add back to the END of the queue for retry later
self._stream_instances_to_start_partition_generation.append(stream)
self._logger.info(
f"Deferring stream '{stream_name}' (group '{stream_group}') because it's already active. Trying next stream."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really understand this case. Why do we put the stream into the back of the stream_instances_to_start if this stream is already active. If this stream is already active, it's presumably already generating partitions and syncing so why do we want to retry it by putting it in the back of the queue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment. This is when we currently read this stream as a parent

self._logger.debug(f"Added '{stream_name}' to active group '{stream_group}'")

# Also mark all parent streams as active (they will be read from during partition generation)
parent_streams = self._collect_all_parent_stream_names(stream_name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to call self._collect_all_parent_stream_names(stream_name) again? Above on line 286, we already get the parent streams https://github.com/airbytehq/airbyte-python-cdk/pull/870/files#diff-125b82e622876f39aefe23c0fee9fc828262e7db2556cb7fb7a7a1b9bdb2fd77R286 .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

)
yield stream_status_as_airbyte_message(stream.as_airbyte_stream(), stream_status)

# Remove only this stream from active set (NOT parents)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the reason why we only remove this stream from the active set because we are already removing the parents from the active set when partition generation is completed in on_partition_generation_completed()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

and p in self._active_stream_names
]

if blocked_by_parents:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One scenario I want to make sure I understand and that we have covered is when a substream child_stream (w/ parent stream a_stream with a blocking group) is started before it's parent.

So let's say that we happen to have the child_stream which is the first in the list of _stream_instances_to_start_partition_generation.

  1. We go through this method and start the stream because it is not blocked by any active streams
  2. We mark all the parent streams which includes a_stream as active and we set the parent's active group
  3. Parent a_stream is read from the list
  4. We see the group is already active from step 2.
  5. We put it back on the end of the list
  6. When we trigger on_partition_generation_completed() for child_stream we remove the parents from the active groups. We then call start_next_partition_generator().
  7. We process parent a_stream. It is no longer in a blocking group so we start that stream.
  8. We add a_stream to active streams and it's blocking group

This is how I interpreted the flow below. Does that sound right to you Anatolii Yatsuk (@tolik0) ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

…tream_groups

- Remove block_simultaneous_read property from DeclarativeStream schema
- Add top-level stream_groups with StreamGroup and BlockSimultaneousSyncsAction
- ConcurrentDeclarativeSource parses stream_groups and injects
  block_simultaneous_read into stream configs before factory processing
- Internal blocking logic in ConcurrentReadProcessor unchanged
- Update tests for new interface

Co-Authored-By: unknown <>
@devin-ai-integration devin-ai-integration bot changed the title feat: Add block_simultaneous_read to DefaultStream feat: Add block_simultaneous_read with top-level stream_groups interface Feb 25, 2026
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)

2769-2769: Could we align the use_cache description with its False default, wdyt?

The description still reads like disabling cache is exceptional, which is confusing with the current default and behavior notes.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@airbyte_cdk/sources/declarative/models/declarative_component_schema.py` at
line 2769, Update the description string for the use_cache field in
declarative_component_schema.py so it reflects that the default is False and
that enabling caching is opt-in; remove wording that implies disabling is
exceptional. Specifically, edit the description on the use_cache field to state
that caching is off by default, explain what enabling it does (repeated requests
to the same URL will be cached), note that parent streams may enable caching
automatically if applicable, and warn about potential duplicate records for
scroll-based pagination when caching is enabled.
🧹 Nitpick comments (4)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml (2)

85-93: Could we tighten streams validation to avoid no-op or duplicate entries, wdyt?

Right now streams can be empty and can include duplicates. Adding simple constraints would catch misconfigurations earlier.

🔧 Proposed schema hardening
      streams:
        title: Streams
        description: >
          List of references to streams that belong to this group. Use JSON references
          to stream definitions (e.g., "#/definitions/my_stream").
        type: array
+       minItems: 1
+       uniqueItems: true
        items:
          type: string
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@airbyte_cdk/sources/declarative/declarative_component_schema.yaml` around
lines 85 - 93, The streams array currently allows empty or duplicate entries;
update the streams schema block (the "streams" property in
declarative_component_schema.yaml) to enforce minItems: 1 and uniqueItems: true
and tighten item validation (e.g., require string pattern matching JSON refs
like "^#/definitions/.+") so misconfigured or no-op stream lists are rejected
early.

87-90: Could we clarify the expected streams identifier format in docs, wdyt?

The current text says “JSON references,” but many manifests define streams inline. A concrete accepted format/example here would reduce author confusion.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@airbyte_cdk/sources/declarative/declarative_component_schema.yaml` around
lines 87 - 90, Update the "streams" description in
declarative_component_schema.yaml to explicitly document the accepted identifier
formats: a JSON Reference string (e.g., "#/definitions/my_stream") and inline
stream definitions (an object with the stream schema), and include a short
example for each format to reduce author confusion; target the "streams"
property description text so it lists both formats and shows one-line examples.
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (1)

5217-5358: Could we tighten this test’s scope and nested-parent assertion for block_simultaneous_read, wdyt?

On Line 5217 the test name/docstring reads as stream-groups end-to-end, but this test is really validating post-injection factory behavior. Also, could we add an assertion that the parent stream instantiated inside child_stream’s SubstreamPartitionRouter keeps "issues_endpoint" too? That would better protect the parent-traversal path used by concurrent blocking logic.

Suggested assertion addition
     assert isinstance(child_stream, DefaultStream)
     assert child_stream.name == "child"
     assert child_stream.block_simultaneous_read == "issues_endpoint"
+    embedded_parent_stream = (
+        child_stream._stream_partition_generator._stream_slicer.parent_stream_configs[0].stream
+    )
+    assert embedded_parent_stream.block_simultaneous_read == "issues_endpoint"
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@unit_tests/sources/declarative/parsers/test_model_to_component_factory.py`
around lines 5217 - 5358, Rename the test/docstring to reflect it validates
factory behavior (post-injection) rather than end-to-end stream-groups and
tighten scope to only check factory-created streams; then add an assertion that
the parent stream object used by the child stream's SubstreamPartitionRouter
retains the injected block_simultaneous_read value. Specifically, in this test
after creating child_stream (DefaultStream) inspect the SubstreamPartitionRouter
instance (referenced by child_stream.partition_router or similar on the created
DefaultStream) and assert that its ParentStreamConfig / instantiated parent
stream (the entry corresponding to the "#/parent_stream" manifest) has
block_simultaneous_read == "issues_endpoint". Ensure the new assertion
references SubstreamPartitionRouter and ParentStreamConfig (or the actual
attribute names on child_stream.partition_router) so the factory’s nested-parent
traversal is covered.
unit_tests/sources/declarative/test_concurrent_declarative_source.py (1)

5155-5218: Could we lock down duplicate stream membership behavior with one more case, wdyt?

If a stream appears in multiple groups, the current mapping will silently overwrite with the last group. Adding an explicit test for that would make the intended precedence (or future validation expectation) clear.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@unit_tests/sources/declarative/test_concurrent_declarative_source.py` around
lines 5155 - 5218, Add a new pytest.param to the existing parametrize block that
covers duplicate stream membership: create a manifest where "stream_groups"
contains "group_a" and "group_b" and both list a stream named "dup_stream" (type
"DeclarativeStream"), and set the expected mapping to reflect the current
precedence (last-wins) by mapping "dup_stream" to "group_b"; give the case an id
like "duplicate_membership_last_wins" so the test documents and locks down the
current behavior.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@airbyte_cdk/sources/declarative/concurrent_declarative_source.py`:
- Around line 552-565: When iterating stream_groups, do not treat string refs as
opaque keys: resolve any string stream_ref (e.g., "#/definitions/..") to the
actual stream definition dict and read its "name" field (use the same
reference-resolution helper or definitions map used elsewhere) and then assign
stream_name_to_group[stream_name] = group_name; if a stream_name is already
present mapped to a different group, reject with a clear exception or error (no
silent last-write-wins). Update the logic that handles isinstance(stream_ref,
str) to perform resolution to a dict, extract the runtime "name", and
enforce/raise on conflicting group assignments for the same stream_name.

---

Duplicate comments:
In `@airbyte_cdk/sources/declarative/models/declarative_component_schema.py`:
- Line 2769: Update the description string for the use_cache field in
declarative_component_schema.py so it reflects that the default is False and
that enabling caching is opt-in; remove wording that implies disabling is
exceptional. Specifically, edit the description on the use_cache field to state
that caching is off by default, explain what enabling it does (repeated requests
to the same URL will be cached), note that parent streams may enable caching
automatically if applicable, and warn about potential duplicate records for
scroll-based pagination when caching is enabled.

---

Nitpick comments:
In `@airbyte_cdk/sources/declarative/declarative_component_schema.yaml`:
- Around line 85-93: The streams array currently allows empty or duplicate
entries; update the streams schema block (the "streams" property in
declarative_component_schema.yaml) to enforce minItems: 1 and uniqueItems: true
and tighten item validation (e.g., require string pattern matching JSON refs
like "^#/definitions/.+") so misconfigured or no-op stream lists are rejected
early.
- Around line 87-90: Update the "streams" description in
declarative_component_schema.yaml to explicitly document the accepted identifier
formats: a JSON Reference string (e.g., "#/definitions/my_stream") and inline
stream definitions (an object with the stream schema), and include a short
example for each format to reduce author confusion; target the "streams"
property description text so it lists both formats and shows one-line examples.

In `@unit_tests/sources/declarative/parsers/test_model_to_component_factory.py`:
- Around line 5217-5358: Rename the test/docstring to reflect it validates
factory behavior (post-injection) rather than end-to-end stream-groups and
tighten scope to only check factory-created streams; then add an assertion that
the parent stream object used by the child stream's SubstreamPartitionRouter
retains the injected block_simultaneous_read value. Specifically, in this test
after creating child_stream (DefaultStream) inspect the SubstreamPartitionRouter
instance (referenced by child_stream.partition_router or similar on the created
DefaultStream) and assert that its ParentStreamConfig / instantiated parent
stream (the entry corresponding to the "#/parent_stream" manifest) has
block_simultaneous_read == "issues_endpoint". Ensure the new assertion
references SubstreamPartitionRouter and ParentStreamConfig (or the actual
attribute names on child_stream.partition_router) so the factory’s nested-parent
traversal is covered.

In `@unit_tests/sources/declarative/test_concurrent_declarative_source.py`:
- Around line 5155-5218: Add a new pytest.param to the existing parametrize
block that covers duplicate stream membership: create a manifest where
"stream_groups" contains "group_a" and "group_b" and both list a stream named
"dup_stream" (type "DeclarativeStream"), and set the expected mapping to reflect
the current precedence (last-wins) by mapping "dup_stream" to "group_b"; give
the case an id like "duplicate_membership_last_wins" so the test documents and
locks down the current behavior.

ℹ️ Review info

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8c06ce6 and 49b0174.

📒 Files selected for processing (6)
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml
  • airbyte_cdk/sources/declarative/models/declarative_component_schema.py
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
  • unit_tests/sources/declarative/parsers/test_model_to_component_factory.py
  • unit_tests/sources/declarative/test_concurrent_declarative_source.py
🚧 Files skipped from review as they are similar to previous changes (1)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Comment on lines +552 to +565
for group_name, group_config in stream_groups.items():
streams = group_config.get("streams", [])
for stream_ref in streams:
if isinstance(stream_ref, dict):
# After reference resolution, stream_ref is a full stream definition dict
stream_name = stream_ref.get("name", "")
if stream_name:
stream_name_to_group[stream_name] = group_name
elif isinstance(stream_ref, str):
# If not resolved (shouldn't happen normally), extract name from ref path
# e.g., "#/definitions/my_stream" -> "my_stream"
if stream_ref.startswith("#/definitions/"):
stream_name = stream_ref.split("/")[-1]
stream_name_to_group[stream_name] = group_name
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Could we resolve string refs to actual stream name values and reject conflicting group assignments, wdyt?

Line 564 currently maps #/.../<definition_key> to <definition_key>, but downstream matching uses runtime stream name (Line 412). If definition keys differ from stream names, blocking is silently skipped. Also, duplicate stream membership across groups currently becomes last-write-wins.

💡 Suggested hardening diff
@@
     def _build_stream_name_to_group(manifest: Mapping[str, Any]) -> Dict[str, str]:
@@
-        for group_name, group_config in stream_groups.items():
+        def _resolve_ref_to_stream_name(stream_ref: str) -> Optional[str]:
+            if not stream_ref.startswith("#/"):
+                return None
+            node: Any = manifest
+            for part in stream_ref.removeprefix("#/").split("/"):
+                if not isinstance(node, Mapping) or part not in node:
+                    return None
+                node = node[part]
+            if isinstance(node, Mapping):
+                resolved_name = node.get("name")
+                if isinstance(resolved_name, str) and resolved_name:
+                    return resolved_name
+            return None
+
+        def _assign(stream_name: str, group_name: str) -> None:
+            existing_group = stream_name_to_group.get(stream_name)
+            if existing_group and existing_group != group_name:
+                raise ValueError(
+                    f"Stream '{stream_name}' is assigned to multiple stream_groups: "
+                    f"'{existing_group}' and '{group_name}'."
+                )
+            stream_name_to_group[stream_name] = group_name
+
+        for group_name, group_config in stream_groups.items():
             streams = group_config.get("streams", [])
             for stream_ref in streams:
                 if isinstance(stream_ref, dict):
                     # After reference resolution, stream_ref is a full stream definition dict
                     stream_name = stream_ref.get("name", "")
                     if stream_name:
-                        stream_name_to_group[stream_name] = group_name
+                        _assign(stream_name, group_name)
                 elif isinstance(stream_ref, str):
                     # If not resolved (shouldn't happen normally), extract name from ref path
                     # e.g., "#/definitions/my_stream" -> "my_stream"
-                    if stream_ref.startswith("#/definitions/"):
-                        stream_name = stream_ref.split("/")[-1]
-                        stream_name_to_group[stream_name] = group_name
+                    resolved_name = _resolve_ref_to_stream_name(stream_ref)
+                    if resolved_name:
+                        _assign(resolved_name, group_name)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@airbyte_cdk/sources/declarative/concurrent_declarative_source.py` around
lines 552 - 565, When iterating stream_groups, do not treat string refs as
opaque keys: resolve any string stream_ref (e.g., "#/definitions/..") to the
actual stream definition dict and read its "name" field (use the same
reference-resolution helper or definitions map used elsewhere) and then assign
stream_name_to_group[stream_name] = group_name; if a stream_name is already
present mapped to a different group, reject with a clear exception or error (no
silent last-write-wins). Update the logic that handles isinstance(stream_ref,
str) to perform resolution to a dict, extract the runtime "name", and
enforce/raise on conflicting group assignments for the same stream_name.

- Add stream_name_to_group parameter to ModelToComponentFactory.__init__()
- Add set_stream_name_to_group() method for post-init configuration
- Factory now looks up block_simultaneous_read from its own mapping
- Remove config injection hack from ConcurrentDeclarativeSource.streams()
- Update tests to use factory-based approach instead of extra fields

Co-Authored-By: unknown <>
…oded dict

- Test now defines stream_groups with references in the manifest YAML
- Uses _build_stream_name_to_group() to derive the mapping from manifest
- Removed test_set_stream_name_to_group (redundant with the manifest-based test)
- Added ConcurrentDeclarativeSource import for _build_stream_name_to_group

Co-Authored-By: unknown <>
Child streams that depend on parent streams should not be in the same
group, as this would cause a deadlock (child needs to read parent).

Co-Authored-By: unknown <>
Co-Authored-By: unknown <>
- Factory now owns the stream_groups resolution via set_stream_groups(manifest)
- ConcurrentDeclarativeSource just calls factory.set_stream_groups(manifest)
- Removed _build_stream_name_to_group from ConcurrentDeclarativeSource
- Updated tests to use factory's _build_stream_name_to_group directly

Co-Authored-By: unknown <>
…of factory

- Removed _build_stream_name_to_group, set_stream_groups, _stream_name_to_group from factory
- Factory no longer knows about stream_groups at all
- Added _apply_stream_groups to ConcurrentDeclarativeSource: creates streams first, then sets block_simultaneous_read on matching DefaultStream instances
- Added block_simultaneous_read setter on DefaultStream
- Replaced mock-based tests with parametrized tests using real DefaultStream instances

Co-Authored-By: unknown <>
@agarctfi
Copy link
Contributor

Brian Lai (@brianjlai) Anatolii Yatsuk (@tolik0) I'm not sure who is working on this. I see commits being made with no authors. However, flagging that this is now being swarmed here.

I can try to test these changes if they're ready. Please let me know.

Anatolii Yatsuk (tolik0) and others added 4 commits March 4, 2026 18:40
Replace hasattr chain in ConcurrentReadProcessor._collect_all_parent_stream_names
with DefaultStream.get_partition_router() that safely traverses the internal
partition_generator -> stream_slicer -> partition_router chain using isinstance checks.

Co-Authored-By: unknown <>
_apply_stream_groups now checks that no stream shares a group with any
of its parent streams (via get_partition_router). Raises ValueError at
config time if a deadlock-causing configuration is detected.

Co-Authored-By: unknown <>
… done

Adds a safety check in is_done() that raises AirbyteTracedException
(system_error) if streams remain in the partition generation queue after
all streams are marked done. Also moves inline imports to module level
and updates test mocks to use DefaultStream with get_partition_router().

Co-Authored-By: unknown <>
…d concurrent_declarative_source.py

Co-Authored-By: unknown <>
…ct parent streams

Co-Authored-By: unknown <>
…artition_router()

Co-Authored-By: unknown <>
…ps check, and get_partition_router

Co-Authored-By: unknown <>

# Complete partition generation for parent (parent has no partitions, so it's done)
sentinel = PartitionGenerationCompletedSentinel(parent)
messages = list(handler.on_partition_generation_completed(sentinel))
Copy link
Contributor

@brianjlai Brian Lai (brianjlai) left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mentioned a few things and one potential interesting change to how we present the StreamGroup component within the ConcurrentSource component. Not blocking and ultimately not something that must be done, but a thought I had.

From a functional side I think we're all set. But again, I think we need this to be a dev CDK version first so we can roll it out to Intercom safely w/o having this go out to all connectors during the automatic version bump that goes out.

description: A description of the connector. It will be presented on the Source documentation page.
additionalProperties: false
definitions:
StreamGroup:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we move this so it's alphabetized instead of just at the top of definitions

"$ref": "#/definitions/ConcurrencyLevel"
api_budget:
"$ref": "#/definitions/HTTPAPIBudget"
stream_groups:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

an interesting idea I had was instead of needing to define a separate stream_groups field on the DeclarativeSource component, what if we were to change it so that the streams list can take in an additional type in the anyOf for "$ref": "#/definitions/StreamGroup". And so therefore StreamGroup components can just be defined in there. And since right now stream_groups is a map, we'd have to adjust the StreamGroup component to also add a name field since we won't have a key if we remove this stream_groups mapping

It does make how we implement _apply_stream_groups a little more complicated because we need to check the isinstance for StreamGroup, but it also feels like a natural place to place the StreamGroup since a StreamGroup is just a list of grouped streams. But this feels like it might be a little bit of a cleaner interface if we can make it work.

Anatolii Yatsuk (@tolik0) what do you think about this design?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants