feat: Add block_simultaneous_read with top-level stream_groups interface#870
feat: Add block_simultaneous_read with top-level stream_groups interface#870Anatolii Yatsuk (tolik0) wants to merge 26 commits intomainfrom
Conversation
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. Testing This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@tolik0/concurrent-source/add-block_simultaneous_read#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch tolik0/concurrent-source/add-block_simultaneous_readHelpful ResourcesPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
|
/autofix
|
|
/prerelease
|
|
/prerelease
|
|
/prerelease
|
There was a problem hiding this comment.
Pull request overview
This PR introduces a block_simultaneous_read feature to the Python CDK that prevents concurrent execution of streams sharing the same resource (API endpoint, session, or rate limit pool). The feature uses string-based group identifiers where streams with matching non-empty group names will not run concurrently, addressing issues like duplicate API calls when streams function as both standalone and parent streams.
Changes:
- Added
block_simultaneous_readproperty to stream interfaces and schema definitions with empty string as default (backward compatible) - Implemented blocking logic in
ConcurrentReadProcessorthat defers streams when their group or parent's group is active - Added comprehensive test coverage for various blocking scenarios including parent-child relationships and multi-level hierarchies
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
test_concurrent_read_processor.py |
Added comprehensive test suite covering all blocking scenarios |
test_model_to_component_factory.py |
Added integration test verifying manifest-to-stream property flow |
default_stream.py |
Added block_simultaneous_read property to DefaultStream |
adapters.py |
Added property adapter for legacy stream compatibility |
abstract_stream.py |
Added abstract property definition with documentation |
model_to_component_factory.py |
Integrated property from manifest to stream construction |
declarative_component_schema.py |
Generated schema with new property definition |
declarative_component_schema.yaml |
Added schema definition with comprehensive documentation |
concurrent_read_processor.py |
Implemented core blocking logic with group tracking and deferral |
Comments suppressed due to low confidence (1)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml:1
- The description change for
use_cacheappears unrelated to theblock_simultaneous_readfeature. This change should be separated into its own PR or have an explanation for why it's included in this feature PR.
"$schema": http://json-schema.org/draft-07/schema#
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py
Outdated
Show resolved
Hide resolved
Brian Lai (brianjlai)
left a comment
There was a problem hiding this comment.
I have some mixed feelings about how we represent this stream grouping to support blocking from the language. It feels like allowing it to be a string and just grouping to together ourselves might be a little bit error prone for users. It might take some redesigning of how we parse these groups, but I'm wondering if an interesting approach would be something like a StreamGroup which would in turn contain a set of streams that needs to be unblocking. And potentially config options like blocking or whatever other ones we might think of. And maybe StreamGroup is itself a subclass of a Stream. Granted I haven't dedicated a ton of time to fleshing this out more, but maybe this is something we can sync on.
I think the other thing that worries me about this PR is that it feels quite risky because all of this grouping and pausing logic along the processing queue and which partitions we put on the queue feels like a lot of complexity and if we're not careful, we might introduce a deadlock for all of our connectors. I'm having a hard time parsing through the logic and getting a clear picture of what edge cases I might be missing honestly.
So two things would be:
- I think us doing an in-person review of this over Zoom would be pretty helpful
- I think the logic is meant to be a no-op if no streams define a blocking group so then
self_stream_block_simultaneous_readwill all be"". But the logic is still pretty deeply integrated across the whole flow so a bug here could affect all our connectors. Maybe we can try to find a better way of isolating this behavior, and that's where that dedicated stream group component might be interesting.
This feels like a legitimate full fledged project if I'm being honest because there are some DX considerations on how to represent this and maybe some ways we need to derisk this change a bit more.
| # All streams in the queue are currently blocked | ||
| return None | ||
|
|
||
| def is_done(self) -> bool: |
There was a problem hiding this comment.
Add a check if no streams in _stream_instances_to_start_partition_generation
There was a problem hiding this comment.
also worth it to maybe check that active stream groups is empty too
Brian Lai (brianjlai)
left a comment
There was a problem hiding this comment.
I have a few things and a lot of comments to clarify my understanding of the flow and what is expected to happen under different scenarios.
We already chatted over Zoom about the changes to the interface to more explicitly define stream groupings and the blocking actions, Daryna Ishchenko (@darynaishchenko) can approve that portion while I am out on PTO.
| hasattr(stream, "_stream_partition_generator") | ||
| and hasattr(stream._stream_partition_generator, "_stream_slicer") | ||
| and hasattr(stream._stream_partition_generator._stream_slicer, "_partition_router") | ||
| ): |
There was a problem hiding this comment.
This feels pretty messy to have to perform such a careful assertion on a bunch of potentially null private fields. I know it'll be too much work to make these public, but can we create a helper method on DefaultStream called get_partition_router() that does all this for us? and then we can avoid all these hasattr and we just get back the partition router or None? wdyt?
There was a problem hiding this comment.
Fixed
| # Add back to the END of the queue for retry later | ||
| self._stream_instances_to_start_partition_generation.append(stream) | ||
| self._logger.info( | ||
| f"Deferring stream '{stream_name}' (group '{stream_group}') because it's already active. Trying next stream." |
There was a problem hiding this comment.
I don't really understand this case. Why do we put the stream into the back of the stream_instances_to_start if this stream is already active. If this stream is already active, it's presumably already generating partitions and syncing so why do we want to retry it by putting it in the back of the queue?
There was a problem hiding this comment.
Added a comment. This is when we currently read this stream as a parent
| self._logger.debug(f"Added '{stream_name}' to active group '{stream_group}'") | ||
|
|
||
| # Also mark all parent streams as active (they will be read from during partition generation) | ||
| parent_streams = self._collect_all_parent_stream_names(stream_name) |
There was a problem hiding this comment.
why do we need to call self._collect_all_parent_stream_names(stream_name) again? Above on line 286, we already get the parent streams https://github.com/airbytehq/airbyte-python-cdk/pull/870/files#diff-125b82e622876f39aefe23c0fee9fc828262e7db2556cb7fb7a7a1b9bdb2fd77R286 .
There was a problem hiding this comment.
Fixed
| ) | ||
| yield stream_status_as_airbyte_message(stream.as_airbyte_stream(), stream_status) | ||
|
|
||
| # Remove only this stream from active set (NOT parents) |
There was a problem hiding this comment.
Is the reason why we only remove this stream from the active set because we are already removing the parents from the active set when partition generation is completed in on_partition_generation_completed()
There was a problem hiding this comment.
Yes
| and p in self._active_stream_names | ||
| ] | ||
|
|
||
| if blocked_by_parents: |
There was a problem hiding this comment.
One scenario I want to make sure I understand and that we have covered is when a substream child_stream (w/ parent stream a_stream with a blocking group) is started before it's parent.
So let's say that we happen to have the child_stream which is the first in the list of _stream_instances_to_start_partition_generation.
- We go through this method and start the stream because it is not blocked by any active streams
- We mark all the parent streams which includes
a_streamas active and we set the parent's active group - Parent
a_streamis read from the list - We see the group is already active from step 2.
- We put it back on the end of the list
- When we trigger
on_partition_generation_completed()forchild_streamwe remove the parents from the active groups. We then callstart_next_partition_generator(). - We process parent
a_stream. It is no longer in a blocking group so we start that stream. - We add
a_streamto active streams and it's blocking group
This is how I interpreted the flow below. Does that sound right to you Anatolii Yatsuk (@tolik0) ?
There was a problem hiding this comment.
Yes
…tream_groups - Remove block_simultaneous_read property from DeclarativeStream schema - Add top-level stream_groups with StreamGroup and BlockSimultaneousSyncsAction - ConcurrentDeclarativeSource parses stream_groups and injects block_simultaneous_read into stream configs before factory processing - Internal blocking logic in ConcurrentReadProcessor unchanged - Update tests for new interface Co-Authored-By: unknown <>
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
2769-2769: Could we align theuse_cachedescription with itsFalsedefault, wdyt?The description still reads like disabling cache is exceptional, which is confusing with the current default and behavior notes.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@airbyte_cdk/sources/declarative/models/declarative_component_schema.py` at line 2769, Update the description string for the use_cache field in declarative_component_schema.py so it reflects that the default is False and that enabling caching is opt-in; remove wording that implies disabling is exceptional. Specifically, edit the description on the use_cache field to state that caching is off by default, explain what enabling it does (repeated requests to the same URL will be cached), note that parent streams may enable caching automatically if applicable, and warn about potential duplicate records for scroll-based pagination when caching is enabled.
🧹 Nitpick comments (4)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml (2)
85-93: Could we tightenstreamsvalidation to avoid no-op or duplicate entries, wdyt?Right now
streamscan be empty and can include duplicates. Adding simple constraints would catch misconfigurations earlier.🔧 Proposed schema hardening
streams: title: Streams description: > List of references to streams that belong to this group. Use JSON references to stream definitions (e.g., "#/definitions/my_stream"). type: array + minItems: 1 + uniqueItems: true items: type: string🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@airbyte_cdk/sources/declarative/declarative_component_schema.yaml` around lines 85 - 93, The streams array currently allows empty or duplicate entries; update the streams schema block (the "streams" property in declarative_component_schema.yaml) to enforce minItems: 1 and uniqueItems: true and tighten item validation (e.g., require string pattern matching JSON refs like "^#/definitions/.+") so misconfigured or no-op stream lists are rejected early.
87-90: Could we clarify the expectedstreamsidentifier format in docs, wdyt?The current text says “JSON references,” but many manifests define streams inline. A concrete accepted format/example here would reduce author confusion.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@airbyte_cdk/sources/declarative/declarative_component_schema.yaml` around lines 87 - 90, Update the "streams" description in declarative_component_schema.yaml to explicitly document the accepted identifier formats: a JSON Reference string (e.g., "#/definitions/my_stream") and inline stream definitions (an object with the stream schema), and include a short example for each format to reduce author confusion; target the "streams" property description text so it lists both formats and shows one-line examples.unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (1)
5217-5358: Could we tighten this test’s scope and nested-parent assertion forblock_simultaneous_read, wdyt?On Line 5217 the test name/docstring reads as stream-groups end-to-end, but this test is really validating post-injection factory behavior. Also, could we add an assertion that the parent stream instantiated inside
child_stream’sSubstreamPartitionRouterkeeps"issues_endpoint"too? That would better protect the parent-traversal path used by concurrent blocking logic.Suggested assertion addition
assert isinstance(child_stream, DefaultStream) assert child_stream.name == "child" assert child_stream.block_simultaneous_read == "issues_endpoint" + embedded_parent_stream = ( + child_stream._stream_partition_generator._stream_slicer.parent_stream_configs[0].stream + ) + assert embedded_parent_stream.block_simultaneous_read == "issues_endpoint"🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@unit_tests/sources/declarative/parsers/test_model_to_component_factory.py` around lines 5217 - 5358, Rename the test/docstring to reflect it validates factory behavior (post-injection) rather than end-to-end stream-groups and tighten scope to only check factory-created streams; then add an assertion that the parent stream object used by the child stream's SubstreamPartitionRouter retains the injected block_simultaneous_read value. Specifically, in this test after creating child_stream (DefaultStream) inspect the SubstreamPartitionRouter instance (referenced by child_stream.partition_router or similar on the created DefaultStream) and assert that its ParentStreamConfig / instantiated parent stream (the entry corresponding to the "#/parent_stream" manifest) has block_simultaneous_read == "issues_endpoint". Ensure the new assertion references SubstreamPartitionRouter and ParentStreamConfig (or the actual attribute names on child_stream.partition_router) so the factory’s nested-parent traversal is covered.unit_tests/sources/declarative/test_concurrent_declarative_source.py (1)
5155-5218: Could we lock down duplicate stream membership behavior with one more case, wdyt?If a stream appears in multiple groups, the current mapping will silently overwrite with the last group. Adding an explicit test for that would make the intended precedence (or future validation expectation) clear.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@unit_tests/sources/declarative/test_concurrent_declarative_source.py` around lines 5155 - 5218, Add a new pytest.param to the existing parametrize block that covers duplicate stream membership: create a manifest where "stream_groups" contains "group_a" and "group_b" and both list a stream named "dup_stream" (type "DeclarativeStream"), and set the expected mapping to reflect the current precedence (last-wins) by mapping "dup_stream" to "group_b"; give the case an id like "duplicate_membership_last_wins" so the test documents and locks down the current behavior.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@airbyte_cdk/sources/declarative/concurrent_declarative_source.py`:
- Around line 552-565: When iterating stream_groups, do not treat string refs as
opaque keys: resolve any string stream_ref (e.g., "#/definitions/..") to the
actual stream definition dict and read its "name" field (use the same
reference-resolution helper or definitions map used elsewhere) and then assign
stream_name_to_group[stream_name] = group_name; if a stream_name is already
present mapped to a different group, reject with a clear exception or error (no
silent last-write-wins). Update the logic that handles isinstance(stream_ref,
str) to perform resolution to a dict, extract the runtime "name", and
enforce/raise on conflicting group assignments for the same stream_name.
---
Duplicate comments:
In `@airbyte_cdk/sources/declarative/models/declarative_component_schema.py`:
- Line 2769: Update the description string for the use_cache field in
declarative_component_schema.py so it reflects that the default is False and
that enabling caching is opt-in; remove wording that implies disabling is
exceptional. Specifically, edit the description on the use_cache field to state
that caching is off by default, explain what enabling it does (repeated requests
to the same URL will be cached), note that parent streams may enable caching
automatically if applicable, and warn about potential duplicate records for
scroll-based pagination when caching is enabled.
---
Nitpick comments:
In `@airbyte_cdk/sources/declarative/declarative_component_schema.yaml`:
- Around line 85-93: The streams array currently allows empty or duplicate
entries; update the streams schema block (the "streams" property in
declarative_component_schema.yaml) to enforce minItems: 1 and uniqueItems: true
and tighten item validation (e.g., require string pattern matching JSON refs
like "^#/definitions/.+") so misconfigured or no-op stream lists are rejected
early.
- Around line 87-90: Update the "streams" description in
declarative_component_schema.yaml to explicitly document the accepted identifier
formats: a JSON Reference string (e.g., "#/definitions/my_stream") and inline
stream definitions (an object with the stream schema), and include a short
example for each format to reduce author confusion; target the "streams"
property description text so it lists both formats and shows one-line examples.
In `@unit_tests/sources/declarative/parsers/test_model_to_component_factory.py`:
- Around line 5217-5358: Rename the test/docstring to reflect it validates
factory behavior (post-injection) rather than end-to-end stream-groups and
tighten scope to only check factory-created streams; then add an assertion that
the parent stream object used by the child stream's SubstreamPartitionRouter
retains the injected block_simultaneous_read value. Specifically, in this test
after creating child_stream (DefaultStream) inspect the SubstreamPartitionRouter
instance (referenced by child_stream.partition_router or similar on the created
DefaultStream) and assert that its ParentStreamConfig / instantiated parent
stream (the entry corresponding to the "#/parent_stream" manifest) has
block_simultaneous_read == "issues_endpoint". Ensure the new assertion
references SubstreamPartitionRouter and ParentStreamConfig (or the actual
attribute names on child_stream.partition_router) so the factory’s nested-parent
traversal is covered.
In `@unit_tests/sources/declarative/test_concurrent_declarative_source.py`:
- Around line 5155-5218: Add a new pytest.param to the existing parametrize
block that covers duplicate stream membership: create a manifest where
"stream_groups" contains "group_a" and "group_b" and both list a stream named
"dup_stream" (type "DeclarativeStream"), and set the expected mapping to reflect
the current precedence (last-wins) by mapping "dup_stream" to "group_b"; give
the case an id like "duplicate_membership_last_wins" so the test documents and
locks down the current behavior.
ℹ️ Review info
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
airbyte_cdk/sources/declarative/concurrent_declarative_source.pyairbyte_cdk/sources/declarative/declarative_component_schema.yamlairbyte_cdk/sources/declarative/models/declarative_component_schema.pyairbyte_cdk/sources/declarative/parsers/model_to_component_factory.pyunit_tests/sources/declarative/parsers/test_model_to_component_factory.pyunit_tests/sources/declarative/test_concurrent_declarative_source.py
🚧 Files skipped from review as they are similar to previous changes (1)
- airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
| for group_name, group_config in stream_groups.items(): | ||
| streams = group_config.get("streams", []) | ||
| for stream_ref in streams: | ||
| if isinstance(stream_ref, dict): | ||
| # After reference resolution, stream_ref is a full stream definition dict | ||
| stream_name = stream_ref.get("name", "") | ||
| if stream_name: | ||
| stream_name_to_group[stream_name] = group_name | ||
| elif isinstance(stream_ref, str): | ||
| # If not resolved (shouldn't happen normally), extract name from ref path | ||
| # e.g., "#/definitions/my_stream" -> "my_stream" | ||
| if stream_ref.startswith("#/definitions/"): | ||
| stream_name = stream_ref.split("/")[-1] | ||
| stream_name_to_group[stream_name] = group_name |
There was a problem hiding this comment.
Could we resolve string refs to actual stream name values and reject conflicting group assignments, wdyt?
Line 564 currently maps #/.../<definition_key> to <definition_key>, but downstream matching uses runtime stream name (Line 412). If definition keys differ from stream names, blocking is silently skipped. Also, duplicate stream membership across groups currently becomes last-write-wins.
💡 Suggested hardening diff
@@
def _build_stream_name_to_group(manifest: Mapping[str, Any]) -> Dict[str, str]:
@@
- for group_name, group_config in stream_groups.items():
+ def _resolve_ref_to_stream_name(stream_ref: str) -> Optional[str]:
+ if not stream_ref.startswith("#/"):
+ return None
+ node: Any = manifest
+ for part in stream_ref.removeprefix("#/").split("/"):
+ if not isinstance(node, Mapping) or part not in node:
+ return None
+ node = node[part]
+ if isinstance(node, Mapping):
+ resolved_name = node.get("name")
+ if isinstance(resolved_name, str) and resolved_name:
+ return resolved_name
+ return None
+
+ def _assign(stream_name: str, group_name: str) -> None:
+ existing_group = stream_name_to_group.get(stream_name)
+ if existing_group and existing_group != group_name:
+ raise ValueError(
+ f"Stream '{stream_name}' is assigned to multiple stream_groups: "
+ f"'{existing_group}' and '{group_name}'."
+ )
+ stream_name_to_group[stream_name] = group_name
+
+ for group_name, group_config in stream_groups.items():
streams = group_config.get("streams", [])
for stream_ref in streams:
if isinstance(stream_ref, dict):
# After reference resolution, stream_ref is a full stream definition dict
stream_name = stream_ref.get("name", "")
if stream_name:
- stream_name_to_group[stream_name] = group_name
+ _assign(stream_name, group_name)
elif isinstance(stream_ref, str):
# If not resolved (shouldn't happen normally), extract name from ref path
# e.g., "#/definitions/my_stream" -> "my_stream"
- if stream_ref.startswith("#/definitions/"):
- stream_name = stream_ref.split("/")[-1]
- stream_name_to_group[stream_name] = group_name
+ resolved_name = _resolve_ref_to_stream_name(stream_ref)
+ if resolved_name:
+ _assign(resolved_name, group_name)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@airbyte_cdk/sources/declarative/concurrent_declarative_source.py` around
lines 552 - 565, When iterating stream_groups, do not treat string refs as
opaque keys: resolve any string stream_ref (e.g., "#/definitions/..") to the
actual stream definition dict and read its "name" field (use the same
reference-resolution helper or definitions map used elsewhere) and then assign
stream_name_to_group[stream_name] = group_name; if a stream_name is already
present mapped to a different group, reject with a clear exception or error (no
silent last-write-wins). Update the logic that handles isinstance(stream_ref,
str) to perform resolution to a dict, extract the runtime "name", and
enforce/raise on conflicting group assignments for the same stream_name.
- Add stream_name_to_group parameter to ModelToComponentFactory.__init__() - Add set_stream_name_to_group() method for post-init configuration - Factory now looks up block_simultaneous_read from its own mapping - Remove config injection hack from ConcurrentDeclarativeSource.streams() - Update tests to use factory-based approach instead of extra fields Co-Authored-By: unknown <>
…oded dict - Test now defines stream_groups with references in the manifest YAML - Uses _build_stream_name_to_group() to derive the mapping from manifest - Removed test_set_stream_name_to_group (redundant with the manifest-based test) - Added ConcurrentDeclarativeSource import for _build_stream_name_to_group Co-Authored-By: unknown <>
Child streams that depend on parent streams should not be in the same group, as this would cause a deadlock (child needs to read parent). Co-Authored-By: unknown <>
Co-Authored-By: unknown <>
- Factory now owns the stream_groups resolution via set_stream_groups(manifest) - ConcurrentDeclarativeSource just calls factory.set_stream_groups(manifest) - Removed _build_stream_name_to_group from ConcurrentDeclarativeSource - Updated tests to use factory's _build_stream_name_to_group directly Co-Authored-By: unknown <>
…of factory - Removed _build_stream_name_to_group, set_stream_groups, _stream_name_to_group from factory - Factory no longer knows about stream_groups at all - Added _apply_stream_groups to ConcurrentDeclarativeSource: creates streams first, then sets block_simultaneous_read on matching DefaultStream instances - Added block_simultaneous_read setter on DefaultStream - Replaced mock-based tests with parametrized tests using real DefaultStream instances Co-Authored-By: unknown <>
|
Brian Lai (@brianjlai) Anatolii Yatsuk (@tolik0) I'm not sure who is working on this. I see commits being made with no authors. However, flagging that this is now being swarmed here. I can try to test these changes if they're ready. Please let me know. |
Replace hasattr chain in ConcurrentReadProcessor._collect_all_parent_stream_names with DefaultStream.get_partition_router() that safely traverses the internal partition_generator -> stream_slicer -> partition_router chain using isinstance checks. Co-Authored-By: unknown <>
_apply_stream_groups now checks that no stream shares a group with any of its parent streams (via get_partition_router). Raises ValueError at config time if a deadlock-causing configuration is detected. Co-Authored-By: unknown <>
… done Adds a safety check in is_done() that raises AirbyteTracedException (system_error) if streams remain in the partition generation queue after all streams are marked done. Also moves inline imports to module level and updates test mocks to use DefaultStream with get_partition_router(). Co-Authored-By: unknown <>
…d concurrent_declarative_source.py Co-Authored-By: unknown <>
…ct parent streams Co-Authored-By: unknown <>
…artition_router() Co-Authored-By: unknown <>
Co-Authored-By: unknown <>
…ps check, and get_partition_router Co-Authored-By: unknown <>
…rents Co-Authored-By: unknown <>
Brian Lai (brianjlai)
left a comment
There was a problem hiding this comment.
I mentioned a few things and one potential interesting change to how we present the StreamGroup component within the ConcurrentSource component. Not blocking and ultimately not something that must be done, but a thought I had.
From a functional side I think we're all set. But again, I think we need this to be a dev CDK version first so we can roll it out to Intercom safely w/o having this go out to all connectors during the automatic version bump that goes out.
| description: A description of the connector. It will be presented on the Source documentation page. | ||
| additionalProperties: false | ||
| definitions: | ||
| StreamGroup: |
There was a problem hiding this comment.
nit: can we move this so it's alphabetized instead of just at the top of definitions
| "$ref": "#/definitions/ConcurrencyLevel" | ||
| api_budget: | ||
| "$ref": "#/definitions/HTTPAPIBudget" | ||
| stream_groups: |
There was a problem hiding this comment.
an interesting idea I had was instead of needing to define a separate stream_groups field on the DeclarativeSource component, what if we were to change it so that the streams list can take in an additional type in the anyOf for "$ref": "#/definitions/StreamGroup". And so therefore StreamGroup components can just be defined in there. And since right now stream_groups is a map, we'd have to adjust the StreamGroup component to also add a name field since we won't have a key if we remove this stream_groups mapping
It does make how we implement _apply_stream_groups a little more complicated because we need to check the isinstance for StreamGroup, but it also feels like a natural place to place the StreamGroup since a StreamGroup is just a list of grouped streams. But this feels like it might be a little bit of a cleaner interface if we can make it work.
Anatolii Yatsuk (@tolik0) what do you think about this design?
Summary
Adds a
block_simultaneous_readfeature to prevent multiple streams from running concurrently when they share the same resource. The feature is configured via a top-levelstream_groupsstructure in the manifest (rather than per-stream properties).Interface
stream_groupsis a top-level manifest property (alongsidestreams,definitions,check, etc.)crm_objects), a list of stream$refreferences, and anactionBlockSimultaneousSyncsActionis the only action type for nowImplementation
Schema (
declarative_component_schema.yaml): Removed per-streamblock_simultaneous_readproperty fromDeclarativeStream. Added top-levelstream_groupswithStreamGroupandBlockSimultaneousSyncsActiondefinitions.Pydantic models (
declarative_component_schema.py): AddedBlockSimultaneousSyncsAction,StreamGroupclasses. Addedstream_groups: Optional[Dict[str, StreamGroup]]toDeclarativeSource1/DeclarativeSource2.ConcurrentDeclarativeSource._apply_stream_groups(): Resolves stream groups from actual stream instances after stream creation. Validates that no stream shares a group with any of its parent streams (deadlock prevention). Setsblock_simultaneous_readon matchingDefaultStreaminstances.DefaultStream.get_partition_router(): New helper method that safely traverses thepartition_generator → stream_slicer → partition_routerchain usingisinstancechecks, replacing thehasattrchains inConcurrentReadProcessor.ConcurrentReadProcessor(core blocking logic): Uses group-based deferral/retry with parent-child awareness. Addedis_done()safety check that raisesAirbyteTracedExceptionif streams remain in the partition generation queue after all streams are marked done.Blocking Behavior
First use case: source-intercom — prevents duplicate concurrent requests to the companies endpoint.
Resolves: https://github.com/airbytehq/oncall/issues/8346
Updates since last revision
get_partition_router()helper onDefaultStream. Replaces messyhasattrchains inConcurrentReadProcessor._collect_all_parent_stream_names()with a clean method that usesisinstancechecks to traverseStreamSlicerPartitionGenerator → ConcurrentPerPartitionCursor → partition_router._apply_stream_groups(). RaisesValueErrorat config time if a stream and any of its parent streams are in the same blocking group.is_done()safety check. RaisesAirbyteTracedException(system_error) if_stream_instances_to_start_partition_generationis not empty after all streams are marked done — catches stuck-stream bugs at runtime.ConcurrentPerPartitionCursor,StreamSlicerPartitionGenerator,PartitionRouter,SubstreamPartitionRouter) are now at the top of files per Python coding standards.Review & Testing Checklist for Human
ValueErrorat config time with a clear message.get_partition_router()handles all stream types: Test with bothDefaultStream(withStreamSlicerPartitionGenerator) and legacyDeclarativeStreampaths. Confirm it returnsNonefor streams without partition routers.is_done()check doesn't mask real issues: If the safety check triggers, investigate the root cause (why streams remained in the queue) rather than just fixing the symptom.stream_groupsconfig format to confirm end-to-end blocking behavior and verify no deadlocks occur.ConcurrentPerPartitionCursor,StreamSlicerPartitionGenerator,SubstreamPartitionRouter) should not cause circular dependencies in production connectors.Recommended test plan:
stream_groupsreferencing 2+ streams via$refConcurrentDeclarativeSourceand verify streams in the same group are read sequentiallyNotes
stream_groupscontinue to work unchanged.ConcurrentReadProcessorstill uses string-based group identifiers, so the blocking logic is unchanged.test_read_with_concurrent_and_synchronous_streams_with_concurrent_statefails with SQLite locking issues (pre-existing, not caused by these changes).Summary by CodeRabbit
New Features
get_partition_router()onDefaultStreamfor safe partition router access.is_done()to detect stuck streams in partition generation queue.Bug Fixes
Tests