diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 84aaa6c53..44c3918ab 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -3886,6 +3886,16 @@ definitions: type: array items: type: string + skip_rows_before_header: + title: Skip Rows Before Header + description: The number of rows to skip before the header row. For example, if the header row is on the 3rd row, enter 2 in this field. + type: integer + default: 0 + skip_rows_after_header: + title: Skip Rows After Header + description: The number of rows to skip after the header row. + type: integer + default: 0 AsyncJobStatusMap: description: Matches the api job status to Async Job Status. type: object diff --git a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py index 7acb5c1e2..a7c0e31b2 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py @@ -104,24 +104,35 @@ class CsvParser(Parser): encoding: Optional[str] = "utf-8" delimiter: Optional[str] = "," set_values_to_none: Optional[List[str]] = None + skip_rows_before_header: int = 0 + skip_rows_after_header: int = 0 def _get_delimiter(self) -> Optional[str]: - """ - Get delimiter from the configuration. Check for the escape character and decode it. - """ + """Get delimiter from the configuration. Check for the escape character and decode it.""" if self.delimiter is not None: if self.delimiter.startswith("\\"): self.delimiter = self.delimiter.encode("utf-8").decode("unicode_escape") return self.delimiter + @staticmethod + def _skip_rows(text_data: io.TextIOBase, rows_to_skip: int) -> None: + """Skip a specified number of rows from the current position in the text stream.""" + for _ in range(rows_to_skip): + text_data.readline() + def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE: - """ - Parse CSV data from decompressed bytes. - """ + """Parse CSV data from decompressed bytes.""" text_data = TextIOWrapper(data, encoding=self.encoding) # type: ignore + self._skip_rows(text_data, self.skip_rows_before_header) reader = csv.DictReader(text_data, delimiter=self._get_delimiter() or ",") + # After DictReader reads the header (first row it encounters), skip additional rows + # We need to handle skip_rows_after_header by consuming rows from the reader + skipped_after_header = 0 for row in reader: + if skipped_after_header < self.skip_rows_after_header: + skipped_after_header += 1 + continue if self.set_values_to_none: row = {k: (None if v in self.set_values_to_none else v) for k, v in row.items()} yield row diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 5d2f0521f..ecc1556af 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1,5 +1,3 @@ -# Copyright (c) 2025 Airbyte, Inc., all rights reserved. - # generated by datamodel-codegen: # filename: declarative_component_schema.yaml @@ -1200,6 +1198,16 @@ class CsvDecoder(BaseModel): encoding: Optional[str] = "utf-8" delimiter: Optional[str] = "," set_values_to_none: Optional[List[str]] = None + skip_rows_before_header: Optional[int] = Field( + 0, + description="The number of rows to skip before the header row. For example, if the header row is on the 3rd row, enter 2 in this field.", + title="Skip Rows Before Header", + ) + skip_rows_after_header: Optional[int] = Field( + 0, + description="The number of rows to skip after the header row.", + title="Skip Rows After Header", + ) class AsyncJobStatusMap(BaseModel): diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 2bd7d268d..95788fa37 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2672,6 +2672,12 @@ def _get_parser(model: BaseModel, config: Config) -> Parser: encoding=model.encoding, delimiter=model.delimiter, set_values_to_none=model.set_values_to_none, + skip_rows_before_header=model.skip_rows_before_header + if model.skip_rows_before_header + else 0, + skip_rows_after_header=model.skip_rows_after_header + if model.skip_rows_after_header + else 0, ) elif isinstance(model, GzipDecoderModel): return GzipParser( diff --git a/unit_tests/sources/declarative/decoders/test_composite_decoder.py b/unit_tests/sources/declarative/decoders/test_composite_decoder.py index d92d6c605..3e1d02983 100644 --- a/unit_tests/sources/declarative/decoders/test_composite_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_composite_decoder.py @@ -332,6 +332,83 @@ def test_composite_raw_decoder_csv_parser_without_mocked_response(): thread.join(timeout=5) # ensure thread is cleaned up +def _generate_csv_with_metadata( + metadata_lines: int = 0, + post_header_lines: int = 0, + encoding: str = "utf-8", + delimiter: str = ",", +) -> bytes: + """Generate CSV bytes with metadata lines before and/or after the header.""" + lines = [] + for i in range(metadata_lines): + lines.append(f"# metadata line {i + 1}") + header = delimiter.join(["id", "name", "age"]) + lines.append(header) + for i in range(post_header_lines): + lines.append(f"# post-header line {i + 1}") + lines.append(delimiter.join(["1", "John", "28"])) + lines.append(delimiter.join(["2", "Alice", "34"])) + return ("\n".join(lines) + "\n").encode(encoding) + + +@pytest.mark.parametrize( + "skip_before,skip_after,metadata_lines,post_header_lines,expected_count", + [ + pytest.param(0, 0, 0, 0, 2, id="no_skip_rows_default_behavior"), + pytest.param(3, 0, 3, 0, 2, id="skip_3_rows_before_header"), + pytest.param(0, 2, 0, 2, 2, id="skip_2_rows_after_header"), + pytest.param(2, 1, 2, 1, 2, id="skip_both_before_and_after"), + pytest.param(1, 0, 1, 0, 2, id="skip_1_row_before_header"), + ], +) +def test_csv_parser_skip_rows( + requests_mock, + skip_before: int, + skip_after: int, + metadata_lines: int, + post_header_lines: int, + expected_count: int, +): + """Verify CsvParser skips metadata rows before and/or after the header.""" + csv_content = _generate_csv_with_metadata( + metadata_lines=metadata_lines, + post_header_lines=post_header_lines, + ) + requests_mock.register_uri("GET", "https://airbyte.io/", content=csv_content) + response = requests.get("https://airbyte.io/", stream=True) + + parser = CsvParser( + skip_rows_before_header=skip_before, + skip_rows_after_header=skip_after, + ) + composite_raw_decoder = CompositeRawDecoder(parser=parser) + parsed_records = list(composite_raw_decoder.decode(response)) + + assert len(parsed_records) == expected_count + assert parsed_records[0] == {"id": "1", "name": "John", "age": "28"} + assert parsed_records[1] == {"id": "2", "name": "Alice", "age": "34"} + + +def test_csv_parser_skip_rows_before_header_with_tsv(requests_mock): + """Verify skip_rows_before_header works with TSV (tab-delimited) data, matching the reporter's use case.""" + csv_content = _generate_csv_with_metadata( + metadata_lines=3, + delimiter="\t", + ) + requests_mock.register_uri("GET", "https://airbyte.io/", content=csv_content) + response = requests.get("https://airbyte.io/", stream=True) + + parser = CsvParser( + delimiter="\t", + skip_rows_before_header=3, + ) + composite_raw_decoder = CompositeRawDecoder(parser=parser) + parsed_records = list(composite_raw_decoder.decode(response)) + + assert len(parsed_records) == 2 + assert parsed_records[0] == {"id": "1", "name": "John", "age": "28"} + + def test_given_response_already_consumed_when_decode_then_no_data_is_returned(requests_mock): requests_mock.register_uri( "GET", "https://airbyte.io/", content=json.dumps({"test": "test"}).encode()